diff options
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java')
-rw-r--r-- | ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java | 65 |
1 files changed, 30 insertions, 35 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java index 037dd54..b251f0f 100644 --- a/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java +++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2022 CAPGEMINI ENGINEERING. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.onap.ranapp.kafka.listener; import java.time.Duration; import java.util.LinkedList; @@ -34,25 +49,17 @@ public class RanAppEventConsumer { private final Logger logger = LoggerFactory.getLogger(RanAppEventConsumer.class); public RanAppEventConsumer() { } - - @Value("${ranapp.hokpi1_threshold}") private int hokpiThreshold; - @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; ObjectMapper objectMapper = new ObjectMapper(); - @Autowired KafkaTemplate<String, Object> Kafkatemplate; - @Autowired WebsocketClient websocketclient; - ResponsetoA1 response=new ResponsetoA1(); - - public static String requestTopicName,responseTopicName; - + public static String requestTopicName,responseTopicName,kafkakeyName; //@KafkaListener(topics = "#{topicUtil.suffixTopics()}", groupId = "${ranapp.testing.topic.id}", containerFactory = "kafkaListenerFactory") public void consume(KafkaConsumer<String,String> consumer, String requesttopicName, String responsetopicName) { try { @@ -60,15 +67,17 @@ public class RanAppEventConsumer { responseTopicName=responsetopicName; WebsocketClient.requestTopicName=requesttopicName; WebsocketClient.responseTopicName=responsetopicName; - while(true){ - ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(5000)); for(ConsumerRecord<String,String> record: records){ - logger.info("Key: "+ record.key() + ", Value:" +record.value()); + String msg=record.value().translateEscapes().strip().stripIndent().trim().replaceAll("(?m)^[ \t]*\r?\n", ""); + msg=msg.substring( 1, msg.length() - 1 ); + logger.info("Key: "+ record.key() + ", Value:" +msg); logger.info("Partition:" + record.partition()+",Offset:"+record.offset()); - JSONObject obj_name = new JSONObject(record.value()); + JSONObject obj_name = new JSONObject(msg); logger.info(String.format("ranapp message recieved \n-> %s \n", obj_name)); - + kafkakeyName=record.key(); + WebsocketClient.kafkakeyName=record.key(); ANRInputMessage ANRInputMessage=null; try { logger.info(String.format("Parsing the incoming json object to ANRInputMessage class")); @@ -77,7 +86,6 @@ public class RanAppEventConsumer { logger.info(String.format( "Invalid Message received . Exception while parsing JSON object -> %s", ex.getLocalizedMessage())); - ex.printStackTrace(); return; } @@ -86,14 +94,12 @@ public class RanAppEventConsumer { ANROutputMessage ANROutputMessage= convertInputMessagetoANROutput(ANRInputMessage); logger.info(String.format("ANROutputMessage")); logger.info(String.format(objectMapper.writeValueAsString(ANROutputMessage))); - // Output Payload is sent as Websocket message to RC DeviceData devicedata=new DeviceData(); devicedata.setMessage(objectMapper.writeValueAsString(ANROutputMessage.getPayload())); devicedata.setType(MessageType.KAFKA_MSG); logger.info(String.format("Sending Output Payload to RC WS Server")); websocketclient.sendMessage(devicedata); - } catch (Exception e) { e.getLocalizedMessage(); e.printStackTrace(); @@ -108,9 +114,6 @@ public class RanAppEventConsumer { // TODO Auto-generated method stub logger.info(String.format("Creating OutputMessage from ANRInput Function")); ANROutputMessage ANROutputMessage=new ANROutputMessage(); - - - logger.info(String.format("initializing output parameters")); //initializing Output Params PayloadOutput payloadOutput=new PayloadOutput(); @@ -124,21 +127,18 @@ public class RanAppEventConsumer { Common common=new Common(); NeighborListInUse neighbourlistinuse=new NeighborListInUse(); List<LTECell> listltecell=new LinkedList<LTECell>(); - - - logger.info(String.format("Iterating Neighbour cells to validate KPI")); // Parsing the payload.policydata into output message payload - common.setCellIdentity(ANRInputMessage.getPayload().getPolicyData().getCellID()); - int length=ANRInputMessage.getPayload().getPolicyData().getNeighbours().size(); + common.setCellIdentity(ANRInputMessage.getPayload().getCellID()); + int length=ANRInputMessage.getPayload().getNeighbours().size(); for(int i=0;i<length;i++) { LTECell ltecell=new LTECell(); - ltecell.setIdGNBCUCPFunction(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getPNFName()); - ltecell.setPlmnid(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getPlmnid()); + ltecell.setIdGNBCUCPFunction(ANRInputMessage.getPayload().getNeighbours().get(i).getPNFName()); + ltecell.setPlmnid(ANRInputMessage.getPayload().getNeighbours().get(i).getPlmnid()); ltecell.setnRTCI(0); - ltecell.setIdNRCellRelation(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getCellID()); + ltecell.setIdNRCellRelation(ANRInputMessage.getPayload().getNeighbours().get(i).getCellID()); //Identify KPI Measurement - if(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getHoKpi1() < hokpiThreshold) + if(ANRInputMessage.getPayload().getNeighbours().get(i).getHoKpi1() < hokpiThreshold) ltecell.setIsHOAllowed("true"); else ltecell.setIsHOAllowed("false"); @@ -150,20 +150,15 @@ public class RanAppEventConsumer { ran.setNeighborListInUse(neighbourlistinuse); lte.setRan(ran); cellconfig.setLte(lte); - fapservice.setIdNRCellCU(ANRInputMessage.getPayload().getPolicyData().getPNFName()); + fapservice.setIdNRCellCU(ANRInputMessage.getPayload().getPNFName()); fapservice.setCellConfig(cellconfig); - data.setRicId(ANRInputMessage.getPayload().getRicId()); data.setFAPService(fapservice); configuration.setData(data); //how do we iterate in a standard way, currently working in a static way listconfiguration.add(configuration); payloadOutput.setConfigurations(listconfiguration); - - ANROutputMessage.setPayload(payloadOutput); - logger.info(String.format("returning back output msg")); return ANROutputMessage; } - } |