diff options
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java')
-rw-r--r-- | ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java | 36 |
1 files changed, 4 insertions, 32 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java index 612ec3f..722d399 100644 --- a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java +++ b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Cisco and/or its affiliates. + * Copyright (C) 2022 CAPGEMINI ENGINEERING. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,37 +54,28 @@ import com.fasterxml.jackson.databind.ObjectMapper; @Component @ClientEndpoint(encoders = {DeviceDataEncoder.class}, decoders = {DeviceDataDecoder.class}) public class WebsocketClient { - private static final Logger LOG = LoggerFactory.getLogger(WebsocketClient.class); private Session clientSession; private String url; @Value("${websocketserver.address}") private String websocketaddress; - @Value("${websocketserver.endpoint}") private String websocketendpoint; - @Value("${ranapp.name}") private String ranappName; @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; - @Autowired KafkaTemplate<String, Object> Kafkatemplate; ResponsetoA1 response=new ResponsetoA1(); - ObjectMapper objectMapper = new ObjectMapper(); - public static String requestTopicName,responseTopicName; - + public static String kafkakeyName; @OnOpen public void onOpen(Session session,@PathParam("IpPort") String ipPort) throws IOException { - - LOG.info("WebSocket opened (jetty server) Session Id: {}", session.getId()); DeviceData devicedata=new DeviceData(); devicedata.setType(MessageType.INITIAL_CONFIRMATION); - devicedata.setMessage("------------Connection Established to RAN_SIM from "+ ranappName +" ------------"); try { session.getBasicRemote().sendObject(devicedata); @@ -103,7 +94,6 @@ public class WebsocketClient { clientSession = null; new RetryWebsocket(this).start(); } - @OnMessage public void onMessage(DeviceData deviceData, Session session) { try { @@ -116,18 +106,13 @@ public class WebsocketClient { LOG.info("Message Received: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(), session.getId()); produce(deviceData.getMessage()); - - } } - } } catch (Exception e) { LOG.error("Exception in processing message {}", e.getMessage()); } } - - public void sendMessage(DeviceData deviceData) { try { LOG.info("Message to Send: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), @@ -141,7 +126,6 @@ public class WebsocketClient { LOG.error("Exception while sending message {}", e.getMessage()); } } - public void initWebsocketClient() { LOG.info("Initializing web socket client"); this.url = "ws://" + websocketaddress + websocketendpoint ; @@ -174,25 +158,18 @@ public class WebsocketClient { public void connectWebsocket() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); LOG.info("Connecting to Server:"+ url); - clientSession = container.connectToServer(this, new URI(url)); LOG.info("Connected to server, Session id: {}", clientSession.getId()); } public void start() { // TODO Auto-generated method stub - - initWebsocketClient(); } public void produce(String responsetoA1) { JSONObject obj_name = new JSONObject(responsetoA1); LOG.info(String.format("response message recieved from Ransim \n-> %s \n", obj_name)); - try { - - response = objectMapper.readValue(obj_name.toString(), ResponsetoA1.class); - LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); if(ObjectUtils.isEmpty(Kafkatemplate)) { LOG.info(String.format("Initializing Kakfa producer factory")); @@ -202,16 +179,14 @@ public class WebsocketClient { config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); ProducerFactory<String,Object> producerfactory= new DefaultKafkaProducerFactory(config); Kafkatemplate=new KafkaTemplate<String, Object>(producerfactory); - } - - ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, response); + ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, kafkakeyName, response); //Kafkatemplate.send(responseTopicName, response); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("Sent message=[" + response.toString() + - "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "]"); + "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "with topic"+ kafkakeyName +"]"); } @Override public void onFailure(Throwable ex) { @@ -219,14 +194,11 @@ public class WebsocketClient { + response + "] due to : " + ex.getMessage()); } }); - - } catch (Exception ex) { LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); LOG.info(String.format( "Invalid Message received . Exception while parsing JSON object -> %s", ex.getLocalizedMessage())); - ex.printStackTrace(); return; } |