summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/kafka/listener/RanAppEventConsumer.java
blob: b251f0fda2d2a08d57bb5ddca8189c8bc2ecb1e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
 * Copyright (C) 2022 CAPGEMINI ENGINEERING.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onap.ranapp.kafka.listener;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.onap.ranapp.kafka.model.appmodels.ANRInputMessage;
import org.onap.ranapp.kafka.model.appmodels.ANROutputMessage;
import org.onap.ranapp.kafka.model.appmodels.CellConfig;
import org.onap.ranapp.kafka.model.appmodels.Common;
import org.onap.ranapp.kafka.model.appmodels.Configuration;
import org.onap.ranapp.kafka.model.appmodels.Data;
import org.onap.ranapp.kafka.model.appmodels.FAPService;
import org.onap.ranapp.kafka.model.appmodels.LTE;
import org.onap.ranapp.kafka.model.appmodels.LTECell;
import org.onap.ranapp.kafka.model.appmodels.NeighborListInUse;
import org.onap.ranapp.kafka.model.appmodels.PayloadOutput;
import org.onap.ranapp.kafka.model.appmodels.RAN;
import org.onap.ranapp.kafka.model.appmodels.ResponsetoA1;
import org.onap.ranapp.models.DeviceData;
import org.onap.ranapp.models.MessageType;
import org.onap.ranapp.websocket.WebsocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class RanAppEventConsumer {
 private final Logger  logger = LoggerFactory.getLogger(RanAppEventConsumer.class);
 public RanAppEventConsumer() {
 }
 @Value("${ranapp.hokpi1_threshold}")
 private int hokpiThreshold;
 @Value(value = "${kafka.bootstrapAddress}")
 private String bootstrapAddress;
 ObjectMapper objectMapper = new ObjectMapper();
 @Autowired
 KafkaTemplate<String, Object> Kafkatemplate;
 @Autowired
 WebsocketClient websocketclient;
 ResponsetoA1 response=new ResponsetoA1();
 public static String requestTopicName,responseTopicName,kafkakeyName;
 //@KafkaListener(topics = "#{topicUtil.suffixTopics()}", groupId = "${ranapp.testing.topic.id}", containerFactory = "kafkaListenerFactory")
 public void consume(KafkaConsumer<String,String> consumer, String requesttopicName, String responsetopicName) {
  try {
   requestTopicName=requesttopicName;
   responseTopicName=responsetopicName;
   WebsocketClient.requestTopicName=requesttopicName;
   WebsocketClient.responseTopicName=responsetopicName;
  while(true){
            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord<String,String> record: records){
                String msg=record.value().translateEscapes().strip().stripIndent().trim().replaceAll("(?m)^[ \t]*\r?\n", "");
              msg=msg.substring( 1, msg.length() - 1 );
                logger.info("Key: "+ record.key() + ", Value:" +msg);
                logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
          JSONObject obj_name = new JSONObject(msg);
          logger.info(String.format("ranapp message recieved \n-> %s \n", obj_name));
          kafkakeyName=record.key();
          WebsocketClient.kafkakeyName=record.key();
          ANRInputMessage ANRInputMessage=null;
          try {
           logger.info(String.format("Parsing the incoming json object to ANRInputMessage class"));
           ANRInputMessage = objectMapper.readValue(obj_name.toString(), ANRInputMessage.class);
          } catch (Exception ex) {
           logger.info(String.format(
             "Invalid Message received . Exception while parsing JSON object -> %s",
             ex.getLocalizedMessage()));
           ex.printStackTrace();
           return;
          }
          try {
           logger.info(String.format("Invoking convertInputMessagetoANROutput Function"));
           ANROutputMessage ANROutputMessage= convertInputMessagetoANROutput(ANRInputMessage);
           logger.info(String.format("ANROutputMessage"));
           logger.info(String.format(objectMapper.writeValueAsString(ANROutputMessage)));
           // Output Payload is sent as Websocket message to RC
           DeviceData devicedata=new DeviceData();
           devicedata.setMessage(objectMapper.writeValueAsString(ANROutputMessage.getPayload()));
           devicedata.setType(MessageType.KAFKA_MSG);
           logger.info(String.format("Sending Output Payload to RC WS Server"));
           websocketclient.sendMessage(devicedata);
          } catch (Exception e) {
           e.getLocalizedMessage();
           e.printStackTrace();
          }
            }
  }}catch (Exception e) {
   e.getLocalizedMessage();
   e.printStackTrace();
  }
 }
 private ANROutputMessage convertInputMessagetoANROutput(ANRInputMessage ANRInputMessage) {
  // TODO Auto-generated method stub
  logger.info(String.format("Creating OutputMessage from ANRInput Function"));
  ANROutputMessage ANROutputMessage=new ANROutputMessage();
  logger.info(String.format("initializing output parameters"));
  //initializing Output Params
  PayloadOutput payloadOutput=new PayloadOutput();
  List<Configuration> listconfiguration= new LinkedList<Configuration>();
  Configuration configuration=new Configuration();
  Data data=new Data();
  FAPService fapservice=new FAPService();
  CellConfig cellconfig=new CellConfig();
  LTE lte=new LTE();
  RAN ran=new RAN();
  Common common=new Common();
  NeighborListInUse neighbourlistinuse=new NeighborListInUse();
  List<LTECell> listltecell=new LinkedList<LTECell>();
  logger.info(String.format("Iterating Neighbour cells to validate KPI"));
  // Parsing the payload.policydata into output message payload
  common.setCellIdentity(ANRInputMessage.getPayload().getCellID());
  int length=ANRInputMessage.getPayload().getNeighbours().size();
  for(int i=0;i<length;i++) {
   LTECell ltecell=new LTECell();
   ltecell.setIdGNBCUCPFunction(ANRInputMessage.getPayload().getNeighbours().get(i).getPNFName());
   ltecell.setPlmnid(ANRInputMessage.getPayload().getNeighbours().get(i).getPlmnid());
   ltecell.setnRTCI(0);
   ltecell.setIdNRCellRelation(ANRInputMessage.getPayload().getNeighbours().get(i).getCellID());
   //Identify KPI Measurement
   if(ANRInputMessage.getPayload().getNeighbours().get(i).getHoKpi1() < hokpiThreshold)
    ltecell.setIsHOAllowed("true");
   else
    ltecell.setIsHOAllowed("false");
   listltecell.add(ltecell);
  }
  neighbourlistinuse.setLTECell(listltecell);
  neighbourlistinuse.setLTECellNumberOfEntries(Integer.toString(listltecell.size()));
  ran.setCommon(common);
  ran.setNeighborListInUse(neighbourlistinuse);
  lte.setRan(ran);
  cellconfig.setLte(lte);
  fapservice.setIdNRCellCU(ANRInputMessage.getPayload().getPNFName());
  fapservice.setCellConfig(cellconfig);
  data.setFAPService(fapservice);
  configuration.setData(data);
  //how do we iterate in a standard way, currently working in a static way
  listconfiguration.add(configuration);
  payloadOutput.setConfigurations(listconfiguration);
  ANROutputMessage.setPayload(payloadOutput);
  logger.info(String.format("returning back output msg"));
  return ANROutputMessage;
 }
}