summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java234
1 files changed, 234 insertions, 0 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
new file mode 100644
index 0000000..612ec3f
--- /dev/null
+++ b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2016 Cisco and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ranapp.websocket;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.EncodeException;
+import javax.websocket.OnClose;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+import javax.websocket.server.PathParam;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.json.JSONObject;
+import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1;
+import org.onap.ranapp.models.DeviceData;
+import org.onap.ranapp.models.DeviceDataDecoder;
+import org.onap.ranapp.models.DeviceDataEncoder;
+import org.onap.ranapp.models.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+import com.fasterxml.jackson.databind.ObjectMapper;
+//import src.main.java.com.wipro.www.websocket.Exception;
+@Component
+@ClientEndpoint(encoders = {DeviceDataEncoder.class}, decoders = {DeviceDataDecoder.class})
+public class WebsocketClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebsocketClient.class);
+ private Session clientSession;
+ private String url;
+ @Value("${websocketserver.address}")
+ private String websocketaddress;
+
+ @Value("${websocketserver.endpoint}")
+ private String websocketendpoint;
+
+ @Value("${ranapp.name}")
+ private String ranappName;
+ @Value(value = "${kafka.bootstrapAddress}")
+ private String bootstrapAddress;
+
+ @Autowired
+ KafkaTemplate<String, Object> Kafkatemplate;
+ ResponsetoA1 response=new ResponsetoA1();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ public static String requestTopicName,responseTopicName;
+
+ @OnOpen
+ public void onOpen(Session session,@PathParam("IpPort") String ipPort) throws IOException {
+
+
+ LOG.info("WebSocket opened (jetty server) Session Id: {}", session.getId());
+ DeviceData devicedata=new DeviceData();
+ devicedata.setType(MessageType.INITIAL_CONFIRMATION);
+
+ devicedata.setMessage("------------Connection Established to RAN_SIM from "+ ranappName +" ------------");
+ try {
+ session.getBasicRemote().sendObject(devicedata);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ LOG.error("Exception while sending message {}", e.getMessage());
+ } catch (EncodeException e) {
+ // TODO Auto-generated catch block
+ LOG.error("Exception while sending message {}", e.getMessage());
+ }
+ }
+ @OnClose
+ public void OnClose(CloseReason reason, Session session) {
+ LOG.info("Websocket {} closed. Close reason {}. If closed unintentionally, Retrying...", session.getId(),
+ reason.getReasonPhrase());
+ clientSession = null;
+ new RetryWebsocket(this).start();
+ }
+
+ @OnMessage
+ public void onMessage(DeviceData deviceData, Session session) {
+ try {
+ if (deviceData != null) {
+ if (deviceData.getMessage() == null || deviceData.getMessage().trim().equals("")) {
+ LOG.debug("Periodic ping message.... ignore");
+ return;
+ } else {
+ if (deviceData.getType().equals(MessageType.KAFKA_MSG)) {
+ LOG.info("Message Received: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(),
+ session.getId());
+ produce(deviceData.getMessage());
+
+
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in processing message {}", e.getMessage());
+ }
+ }
+
+
+ public void sendMessage(DeviceData deviceData) {
+ try {
+ LOG.info("Message to Send: Type:{}, msg:{}, Session Id:{}", deviceData.getType(),
+ deviceData.getMessage(), clientSession.getId());
+ if (clientSession != null) {
+ clientSession.getBasicRemote().sendObject(deviceData);
+ } else {
+ LOG.error("Could not get websocket client session!!");
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while sending message {}", e.getMessage());
+ }
+ }
+
+ public void initWebsocketClient() {
+ LOG.info("Initializing web socket client");
+ this.url = "ws://" + websocketaddress + websocketendpoint ;
+ this.ranappName= ranappName;
+ new RetryWebsocket(this).start();
+ }
+ class RetryWebsocket extends Thread {
+ WebsocketClient client;
+ public RetryWebsocket(WebsocketClient client) {
+ this.client = client;
+ }
+ public void run() {
+ boolean toContinue = true;
+ while (toContinue) {
+ try {
+ client.connectWebsocket();
+ toContinue = false;
+ LOG.info("Connection established, Stopping RetryWebsocket");
+ } catch (Exception e) {
+ LOG.info("Failed to connect to server, Retrying...");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+ public void connectWebsocket() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ LOG.info("Connecting to Server:"+ url);
+
+ clientSession = container.connectToServer(this, new URI(url));
+ LOG.info("Connected to server, Session id: {}", clientSession.getId());
+ }
+ public void start() {
+ // TODO Auto-generated method stub
+
+
+ initWebsocketClient();
+ }
+ public void produce(String responsetoA1) {
+ JSONObject obj_name = new JSONObject(responsetoA1);
+ LOG.info(String.format("response message recieved from Ransim \n-> %s \n", obj_name));
+
+ try {
+
+
+ response = objectMapper.readValue(obj_name.toString(), ResponsetoA1.class);
+
+ LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName));
+ if(ObjectUtils.isEmpty(Kafkatemplate)) {
+ LOG.info(String.format("Initializing Kakfa producer factory"));
+ Map<String,Object> config = new HashMap<>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ ProducerFactory<String,Object> producerfactory= new DefaultKafkaProducerFactory(config);
+ Kafkatemplate=new KafkaTemplate<String, Object>(producerfactory);
+
+ }
+
+ ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, response);
+ //Kafkatemplate.send(responseTopicName, response);
+ future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+ @Override
+ public void onSuccess(SendResult<String, Object> result) {
+ System.out.println("Sent message=[" + response.toString() +
+ "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "]");
+ }
+ @Override
+ public void onFailure(Throwable ex) {
+ System.out.println("Unable to send message=["
+ + response + "] due to : " + ex.getMessage());
+ }
+ });
+
+
+ } catch (Exception ex) {
+ LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName));
+ LOG.info(String.format(
+ "Invalid Message received . Exception while parsing JSON object -> %s",
+ ex.getLocalizedMessage()));
+
+ ex.printStackTrace();
+ return;
+ }
+ }
+}