/* * Copyright (C) 2022 CAPGEMINI ENGINEERING. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.ranapp.kafka.listener; import java.time.Duration; import java.util.LinkedList; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.json.JSONObject; import org.onap.ranapp.kafka.model.appmodels.ANRInputMessage; import org.onap.ranapp.kafka.model.appmodels.ANROutputMessage; import org.onap.ranapp.kafka.model.appmodels.CellConfig; import org.onap.ranapp.kafka.model.appmodels.Common; import org.onap.ranapp.kafka.model.appmodels.Configuration; import org.onap.ranapp.kafka.model.appmodels.Data; import org.onap.ranapp.kafka.model.appmodels.FAPService; import org.onap.ranapp.kafka.model.appmodels.LTE; import org.onap.ranapp.kafka.model.appmodels.LTECell; import org.onap.ranapp.kafka.model.appmodels.NeighborListInUse; import org.onap.ranapp.kafka.model.appmodels.PayloadOutput; import org.onap.ranapp.kafka.model.appmodels.RAN; import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1; import org.onap.ranapp.models.DeviceData; import org.onap.ranapp.models.MessageType; import org.onap.ranapp.websocket.WebsocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @Service public class RanAppEventConsumer { private final Logger logger = LoggerFactory.getLogger(RanAppEventConsumer.class); public RanAppEventConsumer() { } @Value("${ranapp.hokpi1_threshold}") private int hokpiThreshold; @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; ObjectMapper objectMapper = new ObjectMapper(); @Autowired KafkaTemplate Kafkatemplate; @Autowired WebsocketClient websocketclient; ResponsetoA1 response=new ResponsetoA1(); public static String requestTopicName,responseTopicName,kafkakeyName; //@KafkaListener(topics = "#{topicUtil.suffixTopics()}", groupId = "${ranapp.testing.topic.id}", containerFactory = "kafkaListenerFactory") public void consume(KafkaConsumer consumer, String requesttopicName, String responsetopicName) { try { requestTopicName=requesttopicName; responseTopicName=responsetopicName; WebsocketClient.requestTopicName=requesttopicName; WebsocketClient.responseTopicName=responsetopicName; while(true){ ConsumerRecords records=consumer.poll(Duration.ofMillis(5000)); for(ConsumerRecord record: records){ String msg=record.value().translateEscapes().strip().stripIndent().trim().replaceAll("(?m)^[ \t]*\r?\n", ""); msg=msg.substring( 1, msg.length() - 1 ); logger.info("Key: "+ record.key() + ", Value:" +msg); logger.info("Partition:" + record.partition()+",Offset:"+record.offset()); JSONObject obj_name = new JSONObject(msg); logger.info(String.format("ranapp message recieved \n-> %s \n", obj_name)); kafkakeyName=record.key(); WebsocketClient.kafkakeyName=record.key(); ANRInputMessage ANRInputMessage=null; try { logger.info(String.format("Parsing the incoming json object to ANRInputMessage class")); ANRInputMessage = objectMapper.readValue(obj_name.toString(), ANRInputMessage.class); } catch (Exception ex) { logger.info(String.format( "Invalid Message received . Exception while parsing JSON object -> %s", ex.getLocalizedMessage())); ex.printStackTrace(); return; } try { logger.info(String.format("Invoking convertInputMessagetoANROutput Function")); ANROutputMessage ANROutputMessage= convertInputMessagetoANROutput(ANRInputMessage); logger.info(String.format("ANROutputMessage")); logger.info(String.format(objectMapper.writeValueAsString(ANROutputMessage))); // Output Payload is sent as Websocket message to RC DeviceData devicedata=new DeviceData(); devicedata.setMessage(objectMapper.writeValueAsString(ANROutputMessage.getPayload())); devicedata.setType(MessageType.KAFKA_MSG); logger.info(String.format("Sending Output Payload to RC WS Server")); websocketclient.sendMessage(devicedata); } catch (Exception e) { e.getLocalizedMessage(); e.printStackTrace(); } } }}catch (Exception e) { e.getLocalizedMessage(); e.printStackTrace(); } } private ANROutputMessage convertInputMessagetoANROutput(ANRInputMessage ANRInputMessage) { // TODO Auto-generated method stub logger.info(String.format("Creating OutputMessage from ANRInput Function")); ANROutputMessage ANROutputMessage=new ANROutputMessage(); logger.info(String.format("initializing output parameters")); //initializing Output Params PayloadOutput payloadOutput=new PayloadOutput(); List listconfiguration= new LinkedList(); Configuration configuration=new Configuration(); Data data=new Data(); FAPService fapservice=new FAPService(); CellConfig cellconfig=new CellConfig(); LTE lte=new LTE(); RAN ran=new RAN(); Common common=new Common(); NeighborListInUse neighbourlistinuse=new NeighborListInUse(); List listltecell=new LinkedList(); logger.info(String.format("Iterating Neighbour cells to validate KPI")); // Parsing the payload.policydata into output message payload common.setCellIdentity(ANRInputMessage.getPayload().getCellID()); int length=ANRInputMessage.getPayload().getNeighbours().size(); for(int i=0;i