diff options
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java')
-rw-r--r-- | ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java new file mode 100644 index 0000000..612ec3f --- /dev/null +++ b/ANR-App/src/main/java/org/onap/ranapp/websocket/WebsocketClient.java @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2016 Cisco and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ranapp.websocket; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import javax.websocket.ClientEndpoint; +import javax.websocket.CloseReason; +import javax.websocket.ContainerProvider; +import javax.websocket.EncodeException; +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import javax.websocket.server.PathParam; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.json.JSONObject; +import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1; +import org.onap.ranapp.models.DeviceData; +import org.onap.ranapp.models.DeviceDataDecoder; +import org.onap.ranapp.models.DeviceDataEncoder; +import org.onap.ranapp.models.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import com.fasterxml.jackson.databind.ObjectMapper; +//import src.main.java.com.wipro.www.websocket.Exception; +@Component +@ClientEndpoint(encoders = {DeviceDataEncoder.class}, decoders = {DeviceDataDecoder.class}) +public class WebsocketClient { + + private static final Logger LOG = LoggerFactory.getLogger(WebsocketClient.class); + private Session clientSession; + private String url; + @Value("${websocketserver.address}") + private String websocketaddress; + + @Value("${websocketserver.endpoint}") + private String websocketendpoint; + + @Value("${ranapp.name}") + private String ranappName; + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + + @Autowired + KafkaTemplate<String, Object> Kafkatemplate; + ResponsetoA1 response=new ResponsetoA1(); + + ObjectMapper objectMapper = new ObjectMapper(); + + public static String requestTopicName,responseTopicName; + + @OnOpen + public void onOpen(Session session,@PathParam("IpPort") String ipPort) throws IOException { + + + LOG.info("WebSocket opened (jetty server) Session Id: {}", session.getId()); + DeviceData devicedata=new DeviceData(); + devicedata.setType(MessageType.INITIAL_CONFIRMATION); + + devicedata.setMessage("------------Connection Established to RAN_SIM from "+ ranappName +" ------------"); + try { + session.getBasicRemote().sendObject(devicedata); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.error("Exception while sending message {}", e.getMessage()); + } catch (EncodeException e) { + // TODO Auto-generated catch block + LOG.error("Exception while sending message {}", e.getMessage()); + } + } + @OnClose + public void OnClose(CloseReason reason, Session session) { + LOG.info("Websocket {} closed. Close reason {}. If closed unintentionally, Retrying...", session.getId(), + reason.getReasonPhrase()); + clientSession = null; + new RetryWebsocket(this).start(); + } + + @OnMessage + public void onMessage(DeviceData deviceData, Session session) { + try { + if (deviceData != null) { + if (deviceData.getMessage() == null || deviceData.getMessage().trim().equals("")) { + LOG.debug("Periodic ping message.... ignore"); + return; + } else { + if (deviceData.getType().equals(MessageType.KAFKA_MSG)) { + LOG.info("Message Received: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(), + session.getId()); + produce(deviceData.getMessage()); + + + } + } + + } + } catch (Exception e) { + LOG.error("Exception in processing message {}", e.getMessage()); + } + } + + + public void sendMessage(DeviceData deviceData) { + try { + LOG.info("Message to Send: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), + deviceData.getMessage(), clientSession.getId()); + if (clientSession != null) { + clientSession.getBasicRemote().sendObject(deviceData); + } else { + LOG.error("Could not get websocket client session!!"); + } + } catch (Exception e) { + LOG.error("Exception while sending message {}", e.getMessage()); + } + } + + public void initWebsocketClient() { + LOG.info("Initializing web socket client"); + this.url = "ws://" + websocketaddress + websocketendpoint ; + this.ranappName= ranappName; + new RetryWebsocket(this).start(); + } + class RetryWebsocket extends Thread { + WebsocketClient client; + public RetryWebsocket(WebsocketClient client) { + this.client = client; + } + public void run() { + boolean toContinue = true; + while (toContinue) { + try { + client.connectWebsocket(); + toContinue = false; + LOG.info("Connection established, Stopping RetryWebsocket"); + } catch (Exception e) { + LOG.info("Failed to connect to server, Retrying..."); + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } + } + } + public void connectWebsocket() throws Exception { + WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + LOG.info("Connecting to Server:"+ url); + + clientSession = container.connectToServer(this, new URI(url)); + LOG.info("Connected to server, Session id: {}", clientSession.getId()); + } + public void start() { + // TODO Auto-generated method stub + + + initWebsocketClient(); + } + public void produce(String responsetoA1) { + JSONObject obj_name = new JSONObject(responsetoA1); + LOG.info(String.format("response message recieved from Ransim \n-> %s \n", obj_name)); + + try { + + + response = objectMapper.readValue(obj_name.toString(), ResponsetoA1.class); + + LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); + if(ObjectUtils.isEmpty(Kafkatemplate)) { + LOG.info(String.format("Initializing Kakfa producer factory")); + Map<String,Object> config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + ProducerFactory<String,Object> producerfactory= new DefaultKafkaProducerFactory(config); + Kafkatemplate=new KafkaTemplate<String, Object>(producerfactory); + + } + + ListenableFuture<SendResult<String, Object>> future = Kafkatemplate.send(responseTopicName, response); + //Kafkatemplate.send(responseTopicName, response); + future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { + @Override + public void onSuccess(SendResult<String, Object> result) { + System.out.println("Sent message=[" + response.toString() + + "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "]"); + } + @Override + public void onFailure(Throwable ex) { + System.out.println("Unable to send message=[" + + response + "] due to : " + ex.getMessage()); + } + }); + + + } catch (Exception ex) { + LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); + LOG.info(String.format( + "Invalid Message received . Exception while parsing JSON object -> %s", + ex.getLocalizedMessage())); + + ex.printStackTrace(); + return; + } + } +} |