summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java36
1 files changed, 4 insertions, 32 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
index 612ec3f..722d399 100644
--- a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
+++ b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Cisco and/or its affiliates.
+ * Copyright (C) 2022 CAPGEMINI ENGINEERING.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,37 +54,28 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@Component
@ClientEndpoint(encoders = {DeviceDataEncoder.class}, decoders = {DeviceDataDecoder.class})
public class WebsocketClient {
-
private static final Logger LOG = LoggerFactory.getLogger(WebsocketClient.class);
private Session clientSession;
private String url;
@Value("${websocketserver.address}")
private String websocketaddress;
-
@Value("${websocketserver.endpoint}")
private String websocketendpoint;
-
@Value("${ranapp.name}")
private String ranappName;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
-
@Autowired
KafkaTemplate<String, Object> Kafkatemplate;
ResponsetoA1 response=new ResponsetoA1();
-
ObjectMapper objectMapper = new ObjectMapper();
-
public static String requestTopicName,responseTopicName;
-
+ public static String kafkakeyName;
@OnOpen
public void onOpen(Session session,@PathParam("IpPort") String ipPort) throws IOException {
-
-
LOG.info("WebSocket opened (jetty server) Session Id: {}", session.getId());
DeviceData devicedata=new DeviceData();
devicedata.setType(MessageType.INITIAL_CONFIRMATION);
-
devicedata.setMessage("------------Connection Established to RAN_SIM from "+ ranappName +" ------------");
try {
session.getBasicRemote().sendObject(devicedata);
@@ -103,7 +94,6 @@ public class WebsocketClient {
clientSession = null;
new RetryWebsocket(this).start();
}
-
@OnMessage
public void onMessage(DeviceData deviceData, Session session) {
try {
@@ -116,18 +106,13 @@ public class WebsocketClient {
LOG.info("Message Received: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(),
session.getId());
produce(deviceData.getMessage());
-
-
}
}
-
}
} catch (Exception e) {
LOG.error("Exception in processing message {}", e.getMessage());
}
}
-
-
public void sendMessage(DeviceData deviceData) {
try {
LOG.info("Message to Send: Type:{}, msg:{}, Session Id:{}", deviceData.getType(),
@@ -141,7 +126,6 @@ public class WebsocketClient {
LOG.error("Exception while sending message {}", e.getMessage());
}
}
-
public void initWebsocketClient() {
LOG.info("Initializing web socket client");
this.url = "ws://" + websocketaddress + websocketendpoint ;
@@ -174,25 +158,18 @@ public class WebsocketClient {
public void connectWebsocket() throws Exception {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
LOG.info("Connecting to Server:"+ url);
-
clientSession = container.connectToServer(this, new URI(url));
LOG.info("Connected to server, Session id: {}", clientSession.getId());
}
public void start() {
// TODO Auto-generated method stub
-
-
initWebsocketClient();
}
public void produce(String responsetoA1) {
JSONObject obj_name = new JSONObject(responsetoA1);
LOG.info(String.format("response message recieved from Ransim \n-> %s \n", obj_name));
-
try {
-
-
response = objectMapper.readValue(obj_name.toString(), ResponsetoA1.class);
-
LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName));
if(ObjectUtils.isEmpty(Kafkatemplate)) {
LOG.info(String.format("Initializing Kakfa producer factory"));
@@ -202,16 +179,14 @@ public class WebsocketClient {
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String,Object> producerfactory= new DefaultKafkaProducerFactory(config);
Kafkatemplate=new KafkaTemplate<String, Object>(producerfactory);
-
}
-
- ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, response);
+ ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, kafkakeyName, response);
//Kafkatemplate.send(responseTopicName, response);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("Sent message=[" + response.toString() +
- "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "]");
+ "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "with topic"+ kafkakeyName +"]");
}
@Override
public void onFailure(Throwable ex) {
@@ -219,14 +194,11 @@ public class WebsocketClient {
+ response + "] due to : " + ex.getMessage());
}
});
-
-
} catch (Exception ex) {
LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName));
LOG.info(String.format(
"Invalid Message received . Exception while parsing JSON object -> %s",
ex.getLocalizedMessage()));
-
ex.printStackTrace();
return;
}