/* * Copyright (c) 2016 Cisco and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.ranapp.websocket; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import javax.websocket.ClientEndpoint; import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.EncodeException; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import javax.websocket.server.PathParam; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.json.JSONObject; import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1; import org.onap.ranapp.models.DeviceData; import org.onap.ranapp.models.DeviceDataDecoder; import org.onap.ranapp.models.DeviceDataEncoder; import org.onap.ranapp.models.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import com.fasterxml.jackson.databind.ObjectMapper; //import src.main.java.com.wipro.www.websocket.Exception; @Component @ClientEndpoint(encoders = {DeviceDataEncoder.class}, decoders = {DeviceDataDecoder.class}) public class WebsocketClient { private static final Logger LOG = LoggerFactory.getLogger(WebsocketClient.class); private Session clientSession; private String url; @Value("${websocketserver.address}") private String websocketaddress; @Value("${websocketserver.endpoint}") private String websocketendpoint; @Value("${ranapp.name}") private String ranappName; @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Autowired KafkaTemplate Kafkatemplate; ResponsetoA1 response=new ResponsetoA1(); ObjectMapper objectMapper = new ObjectMapper(); public static String requestTopicName,responseTopicName; @OnOpen public void onOpen(Session session,@PathParam("IpPort") String ipPort) throws IOException { LOG.info("WebSocket opened (jetty server) Session Id: {}", session.getId()); DeviceData devicedata=new DeviceData(); devicedata.setType(MessageType.INITIAL_CONFIRMATION); devicedata.setMessage("------------Connection Established to RAN_SIM from "+ ranappName +" ------------"); try { session.getBasicRemote().sendObject(devicedata); } catch (IOException e) { // TODO Auto-generated catch block LOG.error("Exception while sending message {}", e.getMessage()); } catch (EncodeException e) { // TODO Auto-generated catch block LOG.error("Exception while sending message {}", e.getMessage()); } } @OnClose public void OnClose(CloseReason reason, Session session) { LOG.info("Websocket {} closed. Close reason {}. If closed unintentionally, Retrying...", session.getId(), reason.getReasonPhrase()); clientSession = null; new RetryWebsocket(this).start(); } @OnMessage public void onMessage(DeviceData deviceData, Session session) { try { if (deviceData != null) { if (deviceData.getMessage() == null || deviceData.getMessage().trim().equals("")) { LOG.debug("Periodic ping message.... ignore"); return; } else { if (deviceData.getType().equals(MessageType.KAFKA_MSG)) { LOG.info("Message Received: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(), session.getId()); produce(deviceData.getMessage()); } } } } catch (Exception e) { LOG.error("Exception in processing message {}", e.getMessage()); } } public void sendMessage(DeviceData deviceData) { try { LOG.info("Message to Send: Type:{}, msg:{}, Session Id:{}", deviceData.getType(), deviceData.getMessage(), clientSession.getId()); if (clientSession != null) { clientSession.getBasicRemote().sendObject(deviceData); } else { LOG.error("Could not get websocket client session!!"); } } catch (Exception e) { LOG.error("Exception while sending message {}", e.getMessage()); } } public void initWebsocketClient() { LOG.info("Initializing web socket client"); this.url = "ws://" + websocketaddress + websocketendpoint ; this.ranappName= ranappName; new RetryWebsocket(this).start(); } class RetryWebsocket extends Thread { WebsocketClient client; public RetryWebsocket(WebsocketClient client) { this.client = client; } public void run() { boolean toContinue = true; while (toContinue) { try { client.connectWebsocket(); toContinue = false; LOG.info("Connection established, Stopping RetryWebsocket"); } catch (Exception e) { LOG.info("Failed to connect to server, Retrying..."); try { Thread.sleep(5000); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } } } } } public void connectWebsocket() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); LOG.info("Connecting to Server:"+ url); clientSession = container.connectToServer(this, new URI(url)); LOG.info("Connected to server, Session id: {}", clientSession.getId()); } public void start() { // TODO Auto-generated method stub initWebsocketClient(); } public void produce(String responsetoA1) { JSONObject obj_name = new JSONObject(responsetoA1); LOG.info(String.format("response message recieved from Ransim \n-> %s \n", obj_name)); try { response = objectMapper.readValue(obj_name.toString(), ResponsetoA1.class); LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); if(ObjectUtils.isEmpty(Kafkatemplate)) { LOG.info(String.format("Initializing Kakfa producer factory")); Map config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); ProducerFactory producerfactory= new DefaultKafkaProducerFactory(config); Kafkatemplate=new KafkaTemplate(producerfactory); } ListenableFuture> future = Kafkatemplate.send(responseTopicName, response); //Kafkatemplate.send(responseTopicName, response); future.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + response.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "Data" + result + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + response + "] due to : " + ex.getMessage()); } }); } catch (Exception ex) { LOG.info(String.format("Parsing the incoming json object to ResponsetoA1 class: "+ response +", kafkaresponse topic: "+responseTopicName)); LOG.info(String.format( "Invalid Message received . Exception while parsing JSON object -> %s", ex.getLocalizedMessage())); ex.printStackTrace(); return; } } }