package org.onap.ranapp.kafka.listener; import java.time.Duration; import java.util.LinkedList; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.json.JSONObject; import org.onap.ranapp.kafka.model.appmodels.ANRInputMessage; import org.onap.ranapp.kafka.model.appmodels.ANROutputMessage; import org.onap.ranapp.kafka.model.appmodels.CellConfig; import org.onap.ranapp.kafka.model.appmodels.Common; import org.onap.ranapp.kafka.model.appmodels.Configuration; import org.onap.ranapp.kafka.model.appmodels.Data; import org.onap.ranapp.kafka.model.appmodels.FAPService; import org.onap.ranapp.kafka.model.appmodels.LTE; import org.onap.ranapp.kafka.model.appmodels.LTECell; import org.onap.ranapp.kafka.model.appmodels.NeighborListInUse; import org.onap.ranapp.kafka.model.appmodels.PayloadOutput; import org.onap.ranapp.kafka.model.appmodels.RAN; import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1; import org.onap.ranapp.models.DeviceData; import org.onap.ranapp.models.MessageType; import org.onap.ranapp.websocket.WebsocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @Service public class RanAppEventConsumer { private final Logger logger = LoggerFactory.getLogger(RanAppEventConsumer.class); public RanAppEventConsumer() { } @Value("${ranapp.hokpi1_threshold}") private int hokpiThreshold; @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; ObjectMapper objectMapper = new ObjectMapper(); @Autowired KafkaTemplate Kafkatemplate; @Autowired WebsocketClient websocketclient; ResponsetoA1 response=new ResponsetoA1(); public static String requestTopicName,responseTopicName; //@KafkaListener(topics = "#{topicUtil.suffixTopics()}", groupId = "${ranapp.testing.topic.id}", containerFactory = "kafkaListenerFactory") public void consume(KafkaConsumer consumer, String requesttopicName, String responsetopicName) { try { requestTopicName=requesttopicName; responseTopicName=responsetopicName; WebsocketClient.requestTopicName=requesttopicName; WebsocketClient.responseTopicName=responsetopicName; while(true){ ConsumerRecords records=consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record: records){ logger.info("Key: "+ record.key() + ", Value:" +record.value()); logger.info("Partition:" + record.partition()+",Offset:"+record.offset()); JSONObject obj_name = new JSONObject(record.value()); logger.info(String.format("ranapp message recieved \n-> %s \n", obj_name)); ANRInputMessage ANRInputMessage=null; try { logger.info(String.format("Parsing the incoming json object to ANRInputMessage class")); ANRInputMessage = objectMapper.readValue(obj_name.toString(), ANRInputMessage.class); } catch (Exception ex) { logger.info(String.format( "Invalid Message received . Exception while parsing JSON object -> %s", ex.getLocalizedMessage())); ex.printStackTrace(); return; } try { logger.info(String.format("Invoking convertInputMessagetoANROutput Function")); ANROutputMessage ANROutputMessage= convertInputMessagetoANROutput(ANRInputMessage); logger.info(String.format("ANROutputMessage")); logger.info(String.format(objectMapper.writeValueAsString(ANROutputMessage))); // Output Payload is sent as Websocket message to RC DeviceData devicedata=new DeviceData(); devicedata.setMessage(objectMapper.writeValueAsString(ANROutputMessage.getPayload())); devicedata.setType(MessageType.KAFKA_MSG); logger.info(String.format("Sending Output Payload to RC WS Server")); websocketclient.sendMessage(devicedata); } catch (Exception e) { e.getLocalizedMessage(); e.printStackTrace(); } } }}catch (Exception e) { e.getLocalizedMessage(); e.printStackTrace(); } } private ANROutputMessage convertInputMessagetoANROutput(ANRInputMessage ANRInputMessage) { // TODO Auto-generated method stub logger.info(String.format("Creating OutputMessage from ANRInput Function")); ANROutputMessage ANROutputMessage=new ANROutputMessage(); logger.info(String.format("initializing output parameters")); //initializing Output Params PayloadOutput payloadOutput=new PayloadOutput(); List listconfiguration= new LinkedList(); Configuration configuration=new Configuration(); Data data=new Data(); FAPService fapservice=new FAPService(); CellConfig cellconfig=new CellConfig(); LTE lte=new LTE(); RAN ran=new RAN(); Common common=new Common(); NeighborListInUse neighbourlistinuse=new NeighborListInUse(); List listltecell=new LinkedList(); logger.info(String.format("Iterating Neighbour cells to validate KPI")); // Parsing the payload.policydata into output message payload common.setCellIdentity(ANRInputMessage.getPayload().getPolicyData().getCellID()); int length=ANRInputMessage.getPayload().getPolicyData().getNeighbours().size(); for(int i=0;i