summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java65
1 files changed, 30 insertions, 35 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java
index 037dd54..b251f0f 100644
--- a/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java
+++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (C) 2022 CAPGEMINI ENGINEERING.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.onap.ranapp.kafka.listener;
import java.time.Duration;
import java.util.LinkedList;
@@ -34,25 +49,17 @@ public class RanAppEventConsumer {
private final Logger logger = LoggerFactory.getLogger(RanAppEventConsumer.class);
public RanAppEventConsumer() {
}
-
-
@Value("${ranapp.hokpi1_threshold}")
private int hokpiThreshold;
-
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
ObjectMapper objectMapper = new ObjectMapper();
-
@Autowired
KafkaTemplate<String, Object> Kafkatemplate;
-
@Autowired
WebsocketClient websocketclient;
-
ResponsetoA1 response=new ResponsetoA1();
-
- public static String requestTopicName,responseTopicName;
-
+ public static String requestTopicName,responseTopicName,kafkakeyName;
//@KafkaListener(topics = "#{topicUtil.suffixTopics()}", groupId = "${ranapp.testing.topic.id}", containerFactory = "kafkaListenerFactory")
public void consume(KafkaConsumer<String,String> consumer, String requesttopicName, String responsetopicName) {
try {
@@ -60,15 +67,17 @@ public class RanAppEventConsumer {
responseTopicName=responsetopicName;
WebsocketClient.requestTopicName=requesttopicName;
WebsocketClient.responseTopicName=responsetopicName;
-
while(true){
- ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
+ ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(5000));
for(ConsumerRecord<String,String> record: records){
- logger.info("Key: "+ record.key() + ", Value:" +record.value());
+ String msg=record.value().translateEscapes().strip().stripIndent().trim().replaceAll("(?m)^[ \t]*\r?\n", "");
+ msg=msg.substring( 1, msg.length() - 1 );
+ logger.info("Key: "+ record.key() + ", Value:" +msg);
logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
- JSONObject obj_name = new JSONObject(record.value());
+ JSONObject obj_name = new JSONObject(msg);
logger.info(String.format("ranapp message recieved \n-> %s \n", obj_name));
-
+ kafkakeyName=record.key();
+ WebsocketClient.kafkakeyName=record.key();
ANRInputMessage ANRInputMessage=null;
try {
logger.info(String.format("Parsing the incoming json object to ANRInputMessage class"));
@@ -77,7 +86,6 @@ public class RanAppEventConsumer {
logger.info(String.format(
"Invalid Message received . Exception while parsing JSON object -> %s",
ex.getLocalizedMessage()));
-
ex.printStackTrace();
return;
}
@@ -86,14 +94,12 @@ public class RanAppEventConsumer {
ANROutputMessage ANROutputMessage= convertInputMessagetoANROutput(ANRInputMessage);
logger.info(String.format("ANROutputMessage"));
logger.info(String.format(objectMapper.writeValueAsString(ANROutputMessage)));
-
// Output Payload is sent as Websocket message to RC
DeviceData devicedata=new DeviceData();
devicedata.setMessage(objectMapper.writeValueAsString(ANROutputMessage.getPayload()));
devicedata.setType(MessageType.KAFKA_MSG);
logger.info(String.format("Sending Output Payload to RC WS Server"));
websocketclient.sendMessage(devicedata);
-
} catch (Exception e) {
e.getLocalizedMessage();
e.printStackTrace();
@@ -108,9 +114,6 @@ public class RanAppEventConsumer {
// TODO Auto-generated method stub
logger.info(String.format("Creating OutputMessage from ANRInput Function"));
ANROutputMessage ANROutputMessage=new ANROutputMessage();
-
-
-
logger.info(String.format("initializing output parameters"));
//initializing Output Params
PayloadOutput payloadOutput=new PayloadOutput();
@@ -124,21 +127,18 @@ public class RanAppEventConsumer {
Common common=new Common();
NeighborListInUse neighbourlistinuse=new NeighborListInUse();
List<LTECell> listltecell=new LinkedList<LTECell>();
-
-
-
logger.info(String.format("Iterating Neighbour cells to validate KPI"));
// Parsing the payload.policydata into output message payload
- common.setCellIdentity(ANRInputMessage.getPayload().getPolicyData().getCellID());
- int length=ANRInputMessage.getPayload().getPolicyData().getNeighbours().size();
+ common.setCellIdentity(ANRInputMessage.getPayload().getCellID());
+ int length=ANRInputMessage.getPayload().getNeighbours().size();
for(int i=0;i<length;i++) {
LTECell ltecell=new LTECell();
- ltecell.setIdGNBCUCPFunction(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getPNFName());
- ltecell.setPlmnid(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getPlmnid());
+ ltecell.setIdGNBCUCPFunction(ANRInputMessage.getPayload().getNeighbours().get(i).getPNFName());
+ ltecell.setPlmnid(ANRInputMessage.getPayload().getNeighbours().get(i).getPlmnid());
ltecell.setnRTCI(0);
- ltecell.setIdNRCellRelation(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getCellID());
+ ltecell.setIdNRCellRelation(ANRInputMessage.getPayload().getNeighbours().get(i).getCellID());
//Identify KPI Measurement
- if(ANRInputMessage.getPayload().getPolicyData().getNeighbours().get(i).getHoKpi1() < hokpiThreshold)
+ if(ANRInputMessage.getPayload().getNeighbours().get(i).getHoKpi1() < hokpiThreshold)
ltecell.setIsHOAllowed("true");
else
ltecell.setIsHOAllowed("false");
@@ -150,20 +150,15 @@ public class RanAppEventConsumer {
ran.setNeighborListInUse(neighbourlistinuse);
lte.setRan(ran);
cellconfig.setLte(lte);
- fapservice.setIdNRCellCU(ANRInputMessage.getPayload().getPolicyData().getPNFName());
+ fapservice.setIdNRCellCU(ANRInputMessage.getPayload().getPNFName());
fapservice.setCellConfig(cellconfig);
- data.setRicId(ANRInputMessage.getPayload().getRicId());
data.setFAPService(fapservice);
configuration.setData(data);
//how do we iterate in a standard way, currently working in a static way
listconfiguration.add(configuration);
payloadOutput.setConfigurations(listconfiguration);
-
-
ANROutputMessage.setPayload(payloadOutput);
-
logger.info(String.format("returning back output msg"));
return ANROutputMessage;
}
-
}