aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java2
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java63
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java10
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java6
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java26
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java53
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java120
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java176
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java112
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java84
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java80
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java204
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java361
13 files changed, 554 insertions, 743 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
index 95b31fb..7b649a4 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
@@ -44,6 +44,6 @@ public interface GenericAdapter {
* @throws ConfigFileSmooksConversionException if unable to convert config file data to smooks object
*
*/
- String transform(String incomingJsonString, String eventType) throws ConfigFileSmooksConversionException, VesException;
+ String transform(String incomingJsonString) throws ConfigFileSmooksConversionException, VesException;
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
index b922b45..c161977 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
@@ -24,18 +24,21 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
import javax.annotation.PreDestroy;
+
import org.milyn.Smooks;
import org.onap.dcaegen2.ves.domain.VesEvent;
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
import org.onap.universalvesadapter.service.VESAdapterInitializer;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
import org.onap.universalvesadapter.utils.SmooksUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.SAXException;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
@@ -53,14 +56,11 @@ import com.google.gson.JsonSyntaxException;
@Component
public class UniversalEventAdapter implements GenericAdapter {
-
- private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
- private String enterpriseId;
- @Value("${defaultEnterpriseId}")
- private String defaultEnterpriseId;
+ private String collectorIdentifierValue;
+ private String collectorIdentifierKey;
private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>();
public UniversalEventAdapter() {
@@ -74,34 +74,53 @@ public class UniversalEventAdapter implements GenericAdapter {
* @return ves Event
*/
@Override
- public String transform(String incomingJsonString, String eventType)
+ public String transform(String incomingJsonString)
throws ConfigFileSmooksConversionException, VesException {
String result = "";
String configFileData;
+
+ String identifier[]= CollectorConfigPropertyRetrival.getProperyArray("identifier");
+ String defaultMappingFile="defaultMappingFile-"+Thread.currentThread().getName();
try {
Gson gson = new Gson();
JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class);
- JsonElement results = body.get("notify OID");
- String notifyOid = results.getAsString();
-
- // extracting enterprise id from notify OID of SNMP trap.
- enterpriseId = notifyOid.substring(0, notifyOid.length() - 4);
+
+ JsonElement results;
+ for(int i=0;i<identifier.length;i++)
+ {
+ if(body.has(identifier[i]))
+ {
+ collectorIdentifierKey=identifier[i];
+ results=body.get(identifier[i]);
+ collectorIdentifierValue=results.getAsString();
+
+ }
+
+ }
+ //collectorIdentifierValue = collectorIdentifierValue.substring(0, collectorIdentifierValue.length() - 4);
+ if(collectorIdentifierKey.equals("notify OID"))
+ {
+ collectorIdentifierValue = collectorIdentifierValue.substring(0, collectorIdentifierValue.length() - 4);
+ }
+
- if (VESAdapterInitializer.getMappingFiles().containsKey(enterpriseId)) {
- configFileData = VESAdapterInitializer.getMappingFiles().get(enterpriseId);
- debugLogger.debug("Using Mapping file as Mapping file is available for Enterprise Id:{}",enterpriseId);
+ if (VESAdapterInitializer.getMappingFiles().containsKey(collectorIdentifierValue)) {
+ configFileData = VESAdapterInitializer.getMappingFiles().get(collectorIdentifierValue);
+ debugLogger.debug("Using Mapping file as Mapping file is available for collector identifier:{}",collectorIdentifierValue);
+
} else {
- configFileData = VESAdapterInitializer.getMappingFiles().get(defaultEnterpriseId);
- debugLogger.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId);
+ configFileData = VESAdapterInitializer.getMappingFiles().get(defaultMappingFile);
+
+ debugLogger.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",collectorIdentifierValue);
}
Smooks smooksTemp = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
- eventToSmooksMapping.put(eventType, smooksTemp);
+ eventToSmooksMapping.put(collectorIdentifierKey, smooksTemp);
VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,incomingJsonString);
- debugLogger.info("Incoming json transformed to VES format successfully");
+ debugLogger.info("Incoming json transformed to VES format successfully:"+Thread.currentThread().getName());
ObjectMapper objectMapper = new ObjectMapper();
result = objectMapper.writeValueAsString(vesEvent);
debugLogger.info("Serialized VES json");
@@ -109,7 +128,8 @@ public class UniversalEventAdapter implements GenericAdapter {
throw new VesException("Unable to convert pojo to VES format, Reason :{}", exception);
} catch (SAXException | IOException exception) {
//Invalid Mapping file
- errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception);
+ exception.printStackTrace();
+ errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception.getMessage());
} catch (JsonSyntaxException exception) {
// Invalid Trap
@@ -120,7 +140,8 @@ public class UniversalEventAdapter implements GenericAdapter {
}
catch (RuntimeException exception) {
- errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception);
+ exception.printStackTrace();
+ errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception.getMessage());
}
return result;
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
index 2b8158a..72a7d9c 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
@@ -114,11 +114,11 @@ public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig {
this.protocol = dmaapConfig.getDEFAULT_PROTOCOL();
this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID();
- this.consumerGroup = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
- this.timeoutMS =dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
- this.messageLimit = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
- this.timeoutMSParam=dmaapConfig.getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
- this.messageLimitParam=dmaapConfig.getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
+ this.consumerGroup = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
+ this.timeoutMS =dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
+ this.messageLimit = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
+ this.timeoutMSParam=dmaapConfig.getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
+ this.messageLimitParam=dmaapConfig.getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
index 602857f..0621e86 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
@@ -45,8 +45,6 @@ public class VesController {
@Autowired
private VesService vesService;
- @Autowired
- private VESAdapterInitializer vESAdapterInitializer;
/**
* @return message that application is started
@@ -66,9 +64,9 @@ public class VesController {
}
@RequestMapping("/reload")
- public void reloadMappingFileFromDB() {
+ public void reloadMappingFile() {
debugLogger.debug("Reload of Mapping File is started");
- vESAdapterInitializer.fetchMappingFile();
+ //vESAdapterInitializer.readJsonToMap();
debugLogger.debug("Reload of Mapping File is completed");
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
index 98191f7..ce75a6e 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
@@ -39,30 +39,15 @@ public class Creator {
private DMaaPMRFactory dMaaPMRFactoryInstance;
private String dmaaphost;
private String publisherTopic;
- private String subcriberTopic;
+ @Autowired
private DmaapConfig dmaapConfig;
- @Autowired
- public void setDmaapConfig(DmaapConfig dmaapConfig) {
- this.dmaapConfig = dmaapConfig;
- }
-
- public DmaapConfig getDmaapConfig() {
- return dmaapConfig;
- }
-
-
- public Creator() {
-
- }
-
// prop initializer
public void propertyFileInitializer() {
this.dmaaphost = dmaapConfig.getDmaaphost();
this.publisherTopic = dmaapConfig.getPublisherTopic();
- this.subcriberTopic = dmaapConfig.getSubscriberTopic();
this.dMaaPMRFactoryInstance = DMaaPMRFactory.create();
debugLogger.info("The Hostname of DMaap is :" + dmaaphost);
@@ -82,7 +67,7 @@ public class Creator {
}
// subscriber
- public DMaaPMRSubscriber getDMaaPMRSubscriber(){
+ public DMaaPMRSubscriber getDMaaPMRSubscriber(String subcriberTopic){
propertyFileInitializer();
DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null;
try {
@@ -95,5 +80,12 @@ public class Creator {
return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig);
}
+ public void setDmaapConfig(DmaapConfig dmaapConfig) {
+ this.dmaapConfig = dmaapConfig;
+ }
+
+ public DmaapConfig getDmaapConfig() {
+ return dmaapConfig;
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
deleted file mode 100644
index 43766ef..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-//import org.onap.universalvesadapter.adapter.GenericAdapter;
-//import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
-//import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * This service is written to identify the different type of events
- *
- * @author kmalbari
- *
- */
-@Component
-public class AdapterService {
-
-
- /**
- * Identifies eventype by parsing the incoming json file.
- *
- * @param incomingJsonString
- *
- * @return the event type
- * @throws MapperConfigException
- * if mapper config did not perform correctly
- */
- public String identifyEventTypeFromIncomingJson(String incomingJsonString) throws MapperConfigException {
-
- return MapperConfigUtils.checkIncomingJsonForMatchingDomain(incomingJsonString);
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
deleted file mode 100644
index 26aad94..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
-import org.onap.universalvesadapter.dmaap.Creator;
-import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
-import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
-import org.onap.universalvesadapter.exception.DMaapException;
-import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.utils.DmaapConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DMaapService {
-
- private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
- private static List<String> list = new LinkedList<String>();
- @Autowired
- private UniversalEventAdapter eventAdapter;
- @Autowired
- private DmaapConfig dmaapConfig;
-
- /**
- * It fetches events from DMaap in JSON, transforms JSON to VES format and
- * publishes it to outgoing DMaap MR Topic
- *
- * @param DMaaPMRSubscriber,DMaaPMRPublisher
- * @return
- */
- public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater)
- throws InterruptedException {
- metricsLogger.info("fetch and publish from and to Dmaap started");
- int pollingInternalInt=dmaapConfig.getPollingInterval();
- debugLogger.info("The Polling Interval in Milli Second is :{}" +pollingInternalInt);
- while (true) {
- synchronized (this) {
- for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) {
- list.add(incomingJsonString);
-
- }
-
- if (list.isEmpty()) {
- Thread.sleep(pollingInternalInt);
- }
- debugLogger.debug("number of messages to be converted :{}", list.size());
-
- if (!list.isEmpty()) {
- String val = ((LinkedList<String>) list).removeFirst();
- List<String> messages = new ArrayList<>();
- String vesEvent = processReceivedJson(val);
- if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
- messages.add(vesEvent);
- publisher.publish(messages);
- metricsLogger.info("Message successfully published to DMaaP Topic");
- }
-
- }
-
- }
- }
- }
-
- /**
- * It finds mapping file for received json, transforms json to VES format
- *
- * @param incomingJsonString
- * @return
- */
- private String processReceivedJson(String incomingJsonString) {
- String outgoingJsonString = null;
- if (!"".equals(incomingJsonString)) {
-
- try {
- /*
- * For Future events String eventType =
- * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
- *
- * LOGGER.debug("Event identified as " + eventType);
- */
-
- outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp");
-
- } catch (VesException exception) {
- errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
- debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
- } catch (DMaapException e) {
- errorLogger.error("Received exception : {}", e.getMessage());
- }
- }
- return outgoingJsonString;
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
index dbf451d..220c5e0 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP : DCAE
* ================================================================================
-* Copyright 2018 TechMahindra
+* Copyright 2018-2019 TechMahindra
*=================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,21 +20,18 @@
package org.onap.universalvesadapter.service;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.codec.binary.Hex;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
import org.onap.universalvesadapter.utils.DmaapConfig;
+import org.onap.universalvesadapter.utils.FetchDynamicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -48,7 +45,6 @@ import org.springframework.stereotype.Component;
//AdapterInitializer
@Component
public class VESAdapterInitializer implements CommandLineRunner, Ordered {
- private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
@@ -56,29 +52,16 @@ public class VESAdapterInitializer implements CommandLineRunner, Ordered {
private Creator creator;
@Autowired
private DmaapConfig dmaapConfig;
- @Value("${spring.datasource.url}")
- String dBurl;
- @Value("${spring.datasource.username}")
- String user;
- @Value("${spring.datasource.password}")
- String pwd;
- @Value("${defaultMappingFilelocation}")
- String defaultMappingFileLocation;
- @Value("${MappingFileTableName}")
- String MappingFileTableName;
- @Value("${defaultEnterpriseId}")
- String defaultEnterpriseId;
@Value("${server.port}")
String serverPort;
private static Map<String, String> mappingFiles = new HashMap<String, String>();
private static Map<String, String> env;
- private static String url;
- public static String retString;
- public static String retCBSString;
- public static String configFile = "/opt/app/KV-Configuration.json";
- byte[] bytesArray = null;
- @Autowired private ApplicationContext applicationContext;
+ public static String configFile = "/opt/app/VESAdapter/conf/kv.json";
+ //public static String configFile = "src\\main\\resources\\kv.json";
+
+ @Autowired
+ private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception {
@@ -86,64 +69,36 @@ public class VESAdapterInitializer implements CommandLineRunner, Ordered {
for (Map.Entry<String, String> entry : env.entrySet()) {
debugLogger.debug(entry.getKey() + ":" + entry.getValue());
}
-
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) {
- //TODO - Add logic to talk to Consul and CBS to get the configuration. For now, we will refer to configuration coming from docker env parameters
-
- debugLogger.info(">>>Dynamic configuration to be used");
+
+ //checks for DMaaP Host and Port No
+ if( (env.get("DMAAPHOST")==null ||(env.get("MR_DEFAULT_PORT_NUMBER")==null))) {
- if( (env.get("DMAAPHOST")==null ||
- (env.get("MR_DEFAULT_PORT_NUMBER")==null ||
- (env.get("URL_JDBC")==null ||
- (env.get("JDBC_USERNAME")==null ||
- (env.get("JDBC_PASSWORD")==null )))))) {
-
-
- errorLogger.error("Some docker environment parameter is missing. Sample Usage is -\n sudo docker run -d -p 8085:8085/tcp --env URL_JDBC=jdbc:postgresql://10.53.172.129:5432/dummy --env JDBC_USERNAME=ngpuser --env JDBC_PASSWORD=root --env MR_DMAAPHOST=10.10.10.10 --env MR_DEFAULT_PORT_NUMBER=3904 --env CONSUL_HOST=10.53.172.109 --env HOSTNAME=mvp-dcaegen2-collectors-ves --env CONFIG_BINDING_SERVICE=config_binding_service -e DMAAPHOST='10.53.172.156' onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest");
- System.exit(SpringApplication.exit(applicationContext, () -> {errorLogger.error("Application stoped due to missing default mapping file");return-1;}));
-
- }else {
-
-
-
- dmaapConfig.setDmaaphost(env.get("DMAAPHOST"));
- dmaapConfig.setDEFAULT_PORT_NUMBER(Integer.parseInt(env.get("MR_DEFAULT_PORT_NUMBER")));
- creator.setDmaapConfig(dmaapConfig);
-
- dBurl=env.get("URL_JDBC");
- user=env.get("JDBC_USERNAME");
- pwd=env.get("JDBC_PASSWORD");
- }
+ errorLogger.error("Some docker environment parameter is missing. Sample Usage is -\n sudo docker run -d -p 8085:8085/tcp --env MR_DEFAULT_PORT_NUMBER=3904 --env CONSUL_HOST=10.53.172.109 --env HOSTNAME=mvp-dcaegen2-service-mua --env CONFIG_BINDING_SERVICE=config_binding_service --env DMAAPHOST='10.53.172.156' onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest");
+ System.exit(SpringApplication.exit(applicationContext, () -> {errorLogger.error("Application stoped due missing DMAAPHOST or MR_DEFAULT_PORT_NUMBER environment varibales.Please refer above example for environment varibales to pass ");return-1;}));
+ }
-
+ dmaapConfig.setDmaaphost(env.get("DMAAPHOST"));
+ dmaapConfig.setDEFAULT_PORT_NUMBER(Integer.parseInt(env.get("MR_DEFAULT_PORT_NUMBER")));
+ creator.setDmaapConfig(dmaapConfig);
+ //check for consul details
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) {
+ debugLogger.info(">>>Dynamic configuration to be used");
+ FetchDynamicConfig.cbsCall();
+
} else {
debugLogger.info(">>>Static configuration to be used");
+
}
- prepareDatabase();
- fetchMappingFile();
+ readJsonToMap(configFile);
+
+ //prepareDatabase();
+ //fetchMappingFile();
debugLogger.info("Triggering controller's start url ");
executecurl("http://localhost:"+serverPort+"/start");
}
- private void getconsul() {
-
- //TODO
- }
-
- public static boolean verifyConfigChange() {
- //TODO
- return false;
- }
-
- public static void getCBS() {
- //TODO
- }
-
- public static void writefile(String retCBSString) {
- //TODO
- }
private static String executecurl(String url) {
@@ -172,31 +127,38 @@ public class VESAdapterInitializer implements CommandLineRunner, Ordered {
}
- public void fetchMappingFile() {
+ private void readJsonToMap(String configFile) {
+ try {
+ JSONArray collectorArray=CollectorConfigPropertyRetrival.collectorConfigArray(configFile);
- try (Connection con = DriverManager.getConnection(dBurl, user, pwd);PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file");ResultSet rs = pstmt.executeQuery()) {
- debugLogger.info("Retrieving data from DB");
- // parsing the column each time is a linear search
- int column1Pos = rs.findColumn("enterpriseid");
- int column2Pos = rs.findColumn("mappingfilecontents");
- String hexString;
- while (rs.next()) {
- String column1 = rs.getString(column1Pos);
- String column2 = rs.getString(column2Pos);
- hexString = column2.substring(2);
- byte[] bytes = Hex.decodeHex(hexString.toCharArray());
- String data = new String(bytes, "UTF-8");
- mappingFiles.put(column1, data);
+ for (int i = 0; i < collectorArray.size(); i++) {
+ JSONObject obj2 = (JSONObject) collectorArray.get(i);
+
+ if (obj2.containsKey("mapping-files")) {
+
+ JSONArray a1 = (JSONArray) obj2.get("mapping-files");
+
+ for (int j = 0; j < a1.size(); j++) {
+ JSONObject obj3 = (JSONObject) a1.get(j);
+ Set<Entry<String, String>> set = obj3.entrySet();
+
+ for (Entry<String, String> entry : set) {
+
+ mappingFiles.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ }
}
- debugLogger.info("DB Initialization Completed, Total # Mappingfiles are:{}" , mappingFiles.size());
+
} catch (Exception e) {
- errorLogger.error("Error occured due to :{}", e.getMessage());
+ errorLogger.error("Exception occured while reading Collector config file cause: ",e.getCause());
}
}
- private void prepareDatabase() throws IOException {
+ /* private void prepareDatabase() throws IOException {
debugLogger.info("The Default Mapping file Location:" + defaultMappingFileLocation.trim());
@@ -277,7 +239,33 @@ public class VESAdapterInitializer implements CommandLineRunner, Ordered {
}));
}
- }
+ }*/
+ /*public void fetchMappingFile() {
+
+ try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {
+ debugLogger.info("Retrieving data from DB");
+ PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file");
+ ResultSet rs = pstmt.executeQuery();
+ // parsing the column each time is a linear search
+ int column1Pos = rs.findColumn("enterpriseid");
+ int column2Pos = rs.findColumn("mappingfilecontents");
+ String hexString;
+ while (rs.next()) {
+ String column1 = rs.getString(column1Pos);
+ String column2 = rs.getString(column2Pos);
+ hexString = column2.substring(2);
+ byte[] bytes = Hex.decodeHex(hexString.toCharArray());
+ String data = new String(bytes, "UTF-8");
+ mappingFiles.put(column1, data);
+ }
+ debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + mappingFiles.size());
+ } catch (Exception e) {
+ errorLogger.error("Error occured due to :" + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }*/
+
public static Map<String, String> getMappingFiles() {
return mappingFiles;
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 06ef080..79317ea 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,10 +19,24 @@
*/
package org.onap.universalvesadapter.service;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
import org.onap.universalvesadapter.dmaap.Creator;
import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.exception.DMaapException;
import org.onap.universalvesadapter.exception.MapperConfigException;
+import org.onap.universalvesadapter.exception.VesException;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
+import org.onap.universalvesadapter.utils.DmaapConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,12 +56,13 @@ public class VesService {
private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
private boolean isRunning = true;
-
- @Autowired
- private DMaapService dmaapService;
-
@Autowired
private Creator creator;
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+ @Autowired
+ private DmaapConfig dmaapConfig;
+ private static List<String> list = new LinkedList<String>();
/**
@@ -55,27 +70,66 @@ public class VesService {
*/
public void start() throws MapperConfigException {
debugLogger.info("Creating Subcriber and Publisher with creator.............");
- DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber();
+
DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
- // Create subscriber & publisher thread
- Thread t1 = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
+ String topicArray[]= CollectorConfigPropertyRetrival.getProperyArray("subscriberTopic");
+
+
+ ExecutorService executorService=Executors.newFixedThreadPool(topicArray.length);
+ for(int i=0;i<topicArray.length;i++) {
+ String topicName =topicArray[i];
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicArray[i]);
+
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run(){
+
+ Thread.currentThread().setName(topicName);
+ metricsLogger.info("fetch and publish from and to Dmaap started:"+Thread.currentThread().getName());
+ int pollingInternalInt=dmaapConfig.getPollingInterval();
+ debugLogger.info("The Polling Interval in Milli Second is :{}" +pollingInternalInt);
debugLogger.info("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
- dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator);
- } catch (InterruptedException e) {
- errorLogger.error("Exception in starting of subscriber & publisher thread:{}",e);
- Thread.currentThread().interrupt();
- }
- }
- });
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : subcriber.fetchMessages().getFetchedMessages()) {
+ list.add(incomingJsonString);
- // Start subscriber & publisher thread
- t1.setName("SNMP-COLLECTOR");
- t1.start();
+ }
+
+ if (list.isEmpty()) {
+ try {
+ Thread.sleep(pollingInternalInt);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ debugLogger.debug("number of messages to be converted :{}", list.size());
+
+ if (!list.isEmpty()) {
+ String val = ((LinkedList<String>) list).removeFirst();
+ List<String> messages = new ArrayList<>();
+ String vesEvent = processReceivedJson(val);
+ if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+ metricsLogger.info("Message successfully published to DMaaP Topic");
+ }
+
+ }
+
+ }
+ }
+
+
+
+ }
+ });
+ }
+
+
}
@@ -85,5 +139,23 @@ public class VesService {
public void stop() {
isRunning = false;
}
+
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString);
+
+ } catch (VesException exception) {
+ errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
+ debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ errorLogger.error("Received exception : {}", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
new file mode 100644
index 0000000..9de7e63
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
@@ -0,0 +1,84 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.utils;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CollectorConfigPropertyRetrival {
+
+
+ public static String configFile = "/opt/app/VESAdapter/conf/kv.json";
+ //public static String configFile = "src\\main\\resources\\kv.json";
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+ private static JSONArray array;
+
+ public static JSONArray collectorConfigArray(String configFile){
+ try {
+ JSONParser parser = new JSONParser();
+ FileReader fileReader = new FileReader(configFile);
+ JSONObject obj = (JSONObject) parser.parse(fileReader);
+ JSONObject appobj = (JSONObject) obj.get("app_preferences");
+ array =(JSONArray) appobj.get("collectors");
+
+ debugLogger.info("Retrieved JsonArray from Collector Config File");
+ } catch (ParseException e) {
+ errorLogger.error("ParseException occured at position:",e.getPosition());
+ } catch (FileNotFoundException e) {
+
+ errorLogger.error("Collector Config File is not found..",e.getMessage());
+ } catch (IOException e) {
+
+ errorLogger.error("Error occured due to :",e.getMessage());
+ }
+
+
+ return array;
+
+ }
+
+
+ public static String [] getProperyArray(String properyName) {
+ JSONArray jsonArray =collectorConfigArray(configFile);
+
+ String [] propertyArray=new String[jsonArray.size()];
+
+ for (int k=0;k<jsonArray.size();k++) {
+
+ JSONObject collJson= (JSONObject) jsonArray.get(k);
+
+ propertyArray[k]=(String) collJson.get(properyName);
+ }
+ debugLogger.info("returning "+properyName+" array from Collector Config");
+ return propertyArray;
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
index 11bba61..02b54c9 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
@@ -26,16 +26,15 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@Component
-@PropertySource(value = {"classpath:application.properties","classpath:DMaapMR.properties"})
+@PropertySource(value = {"classpath:application.properties","classpath:mapper.properties"})
@ConfigurationProperties
public class DmaapConfig {
- @Value("${mr.dmaaphost}")
+ // Hostname of DMaaP to be taken from ENV var
@NotEmpty
private String dmaaphost;
- // default port number
- @Value("${mr.DEFAULT_PORT_NUMBER}")
+ // default port number to be taken from ENV var
@NotEmpty
private int DEFAULT_PORT_NUMBER;
@@ -103,30 +102,25 @@ public class DmaapConfig {
// Subscriber Constants
- //Dmaap Subcriber Topic
- @Value("${mr.subscriber.topic}")
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
@NotEmpty
- private String subscriberTopic;
+ private int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
- @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
@NotEmpty
- private int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ private int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
- @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
@NotEmpty
- private int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ private String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
- @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
+ @Value("${mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
@NotEmpty
- private String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ private String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
- @Value("${mr.subcriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
+ @Value("${mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
@NotEmpty
- private String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
-
- @Value("${mr.subcriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
- @NotEmpty
- private String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ private String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
public void setDmaaphost(String dmaaphost) {
this.dmaaphost = dmaaphost;
@@ -241,52 +235,44 @@ public class DmaapConfig {
this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
}
- public String getSubscriberTopic() {
- return subscriberTopic;
- }
-
- public void setSubscriberTopic(String subscriberTopic) {
- this.subscriberTopic = subscriberTopic;
- }
-
- public int getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
- return subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ public int getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
+ return subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
}
- public void setSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
- this.subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ public void setsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
+ this.subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
}
- public int getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
- return subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ public int getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
+ return subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
}
- public void setSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
- this.subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ public void setsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
+ this.subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
}
- public String getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
- return subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ public String getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
+ return subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
}
- public void setSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
- this.subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ public void setsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
+ this.subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
}
- public String getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
- return subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ public String getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
+ return subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
}
- public void setSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
- this.subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ public void setsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
+ this.subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
}
- public String getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
- return subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ public String getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
+ return subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
}
- public void setSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
- this.subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ public void setsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
+ this.subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
}
public int getPollingInterval() {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java
new file mode 100644
index 0000000..f515858
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java
@@ -0,0 +1,204 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class FetchDynamicConfig {
+
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+
+ // need to change below path
+ public static String configFile = "/opt/app/VESAdapter/conf/kv.json";
+ //public static String configFile = "src\\main\\resources\\kv.json";
+ private static String url;
+ public static String retString;
+ public static String retCBSString;
+ private static Map<String, String> env;
+
+ public FetchDynamicConfig() {
+ }
+
+ public static void cbsCall() {
+
+ env = System.getenv();
+ Boolean areEqual;
+ // Call consul api and identify the CBS Service address and port
+ getconsul();
+ // Construct and invoke CBS API to get application Configuration
+ getCBS();
+ // Verify if data has changed
+ areEqual = verifyConfigChange();
+
+ if (!areEqual) {
+ FetchDynamicConfig fc = new FetchDynamicConfig();
+ fc.writefile(retCBSString);
+ } else {
+ debugLogger.info("New config pull results identical - " + configFile + " NOT refreshed");
+ }
+ }
+
+
+ private static void getconsul() {
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ // for testing :url="10.12.6.50:8500/v1/catalog/service/config_binding_service";
+ //http://10.12.6.44:8500/v1/catlog/service/config_binding_service
+
+ retString = executecurl(url);
+ debugLogger.info("CBS details fetched from Consul");
+
+ }
+
+ public static boolean verifyConfigChange() {
+
+ boolean areEqual = false;
+ // Read current data
+ try {
+ File f = new File(configFile);
+ if (f.exists() && !f.isDirectory()) {
+
+ String jsonData = readFile(configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ JsonNode tree1 = mapper.readTree(jsonObject.toString());
+ JsonNode tree2 = mapper.readTree(retCBSString);
+ areEqual = tree1.equals(tree2);
+ debugLogger.info("Comparison value:" + areEqual);
+ } else {
+ debugLogger.info("First time config file read: " + configFile);
+ }
+
+ } catch (IOException e) {
+ errorLogger.error("Comparison with new fetched data failed" + e.getMessage());
+
+ }
+
+ return areEqual;
+
+ }
+
+ public static void getCBS() {
+
+ // consul return as array
+ JSONTokener temp = new JSONTokener(retString);
+ JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
+
+ String urlPart1 = null;
+ if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
+
+ urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
+
+ }
+ debugLogger.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
+
+ if (env.containsKey("HOSTNAME")) {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retCBSString = executecurl(url);
+ } else if (env.containsKey("SERVICE_NAME")) {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retCBSString = executecurl(url);
+ } else {
+ errorLogger.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
+ }
+
+ }
+
+ public void writefile(String retCBSString) {
+ debugLogger.info("URL to fetch configuration:" + url);
+
+ String indentedretstring = (new JSONObject(retCBSString)).toString(4);
+
+ try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
+ file.write(indentedretstring);
+
+ debugLogger.info("Successfully Copied JSON Object to file " + configFile);
+ } catch (IOException e) {
+ errorLogger.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ public static String readFile(String filename) {
+ String result = "";
+ try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+ result = sb.toString();
+ } catch (FileNotFoundException e) {
+ errorLogger.error("colud not find file :",filename);
+
+ } catch (Exception e) {
+ errorLogger.error("unable to read the file , reason:",e.getCause());
+ }
+ return result;
+ }
+ private static String executecurl(String url) {
+
+ String[] command = { "curl", "-v", url };
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ debugLogger.info(result);
+
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ errorLogger.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
deleted file mode 100644
index fb7ed26..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.mappingconfig.Entry;
-import org.onap.universalvesadapter.mappingconfig.Evaluation;
-import org.onap.universalvesadapter.mappingconfig.MapperConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * This class will be used in Future for different telemetry data:
- * This class will be utility class to read the mapper config file and parse the
- * config to prepare the grammar to detect the incoming json's event type.
- *
- * @author kmalbari
- *
- */
-public class MapperConfigUtils {
-
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
- private static Set<Entry> entries = new TreeSet<>((o1, o2) -> o1.getPriority().compareTo(o2.getPriority()));
-
- private enum JoinOperator {
- AND("AND"), OR("OR");
-
- private final String value;
-
- private JoinOperator(final String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
- }
-
- private enum ExpressionOperator {
- EQUALS("EQUALS"), STARTSWITH("STARTSWITH"), ENDSWITH("ENDSWITH"), CONTAINS("CONTAINS");
-
- private final String value;
-
- private ExpressionOperator(final String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
- }
-
- private enum DataType {
- STRING("STRING"), DOUBLE("DOUBLE");
-
- private final String value;
-
- private DataType(final String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
- }
-
- /**
- * main method.
- *
- * @param args
- * arguments
- */
- public static void main(String[] args) {
-
- String mappingFile = " { " + " \"entries\": [{ " + " \"priority\": 4, " + " \"evaluation\": { "
- + " \"operand\": \"AND\", " + " \"field\": null, " + " \"value\": null, "
- + " \"datatype\": null, " + " \"lhs\": { " + " \"operand\": \"OR\", "
- + " \"field\": null, " + " \"value\": null, "
- + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"domain\", "
- + " \"value\": \"snmp-heartbeat\", " + " \"datatype\": \"string\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " }, "
- + " \"rhs\": { " + " \"operand\": \"EQUALS\", "
- + " \"field\": \"domain\", " + " \"value\": \"snmp-fault\", "
- + " \"datatype\": \"string\", " + " \"lhs\": null, "
- + " \"rhs\": null " + " } " + " }, " + " \"rhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"trap version\", "
- + " \"value\": \"1.2\", " + " \"datatype\": \"float\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " } " + " }, "
- + " \"result\": \"smooks.config\" " + " }, { " + " \"priority\": 1, "
- + " \"evaluation\": { " + " \"operand\": \"AND\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"OR\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"domain\", "
- + " \"value\": \"snmp-heartbeat\", " + " \"datatype\": \"string\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " }, "
- + " \"rhs\": { " + " \"operand\": \"EQUALS\", "
- + " \"field\": \"domain\", " + " \"value\": \"snmp-fault\", "
- + " \"datatype\": \"string\", " + " \"lhs\": null, "
- + " \"rhs\": null " + " } " + " }, " + " \"rhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"trap version\", "
- + " \"value\": \"1.2\", " + " \"datatype\": \"float\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " } " + " }, "
- + " \"result\": \"smooks.config\" " + " }, { " + " \"priority\": 3, "
- + " \"evaluation\": { " + " \"operand\": \"AND\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"OR\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"domain\", "
- + " \"value\": \"snmp-heartbeat\", " + " \"datatype\": \"string\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " }, "
- + " \"rhs\": { " + " \"operand\": \"EQUALS\", "
- + " \"field\": \"domain\", " + " \"value\": \"snmp-fault\", "
- + " \"datatype\": \"string\", " + " \"lhs\": null, "
- + " \"rhs\": null " + " } " + " }, " + " \"rhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"trap version\", "
- + " \"value\": \"1.2\", " + " \"datatype\": \"float\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " } " + " }, "
- + " \"result\": \"smooks.config\" " + " }, { " + " \"priority\": 2, "
- + " \"evaluation\": { " + " \"operand\": \"AND\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"OR\", " + " \"field\": null, "
- + " \"value\": null, " + " \"datatype\": null, " + " \"lhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"domain\", "
- + " \"value\": \"snmp-heartbeat\", " + " \"datatype\": \"string\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " }, "
- + " \"rhs\": { " + " \"operand\": \"EQUALS\", "
- + " \"field\": \"domain\", " + " \"value\": \"snmp-fault\", "
- + " \"datatype\": \"string\", " + " \"lhs\": null, "
- + " \"rhs\": null " + " } " + " }, " + " \"rhs\": { "
- + " \"operand\": \"EQUALS\", " + " \"field\": \"trap version\", "
- + " \"value\": \"1.2\", " + " \"datatype\": \"float\", "
- + " \"lhs\": null, " + " \"rhs\": null " + " } " + " }, "
- + " \"result\": \"smooks.config\" " + " }] " + "}";
- String incomingJsonString = "{\"domain\":\"snmp-heartbeat\",\"trap version\":1.2}";
-
- try {
- readMapperConfigFile(mappingFile);
- checkIncomingJsonForMatchingDomain(incomingJsonString);
- } catch (MapperConfigException e) {
- errorLogger.error("Exception in mapperConfigFile reading:{}",e);
- }
-
- }
-
- /**
- * Checks incoming json to see which of the domain it mathces as per mapper
- * config entries. If nothing matches, a default mapping will be used.
- *
- * @param incomingJsonString
- * incoming json
- * @throws MapperConfigException
- * if error occurs in operation
- */
- public static String checkIncomingJsonForMatchingDomain(String incomingJsonString) throws MapperConfigException {
- ObjectMapper mapper = new ObjectMapper();
- JsonNode actualObj = null;
- try {
- actualObj = mapper.readTree(incomingJsonString);
- } catch (IOException exception) {
- throw new MapperConfigException("Unable to read incoming json in a tree " + exception.getMessage(),
- exception);
- }
- for (Entry entry : entries) {
-
- boolean result = false;
- result = evaluateEntryMatch(entry.getEvaluation(), actualObj);
- if (result) {
- return entry.getResult();
- }
- }
- return "default";
- }
-
- /**
- * Reads the mapper config file.
- *
- * @param mappingFileData
- * string json for mapper config
- * @throws MapperConfigException
- * if error in mapper config
- */
- public static void readMapperConfigFile(String mappingFileData) throws MapperConfigException {
-
- ObjectMapper name = new ObjectMapper();
- MapperConfig config = null;
- try {
- config = name.readValue(mappingFileData, MapperConfig.class);
- } catch (IOException exception) {
- throw new MapperConfigException("Unable to read config file for reason...\n " + exception.getMessage(),
- exception);
- }
- debugLogger.debug("Read config file content into :{}",config);
-
- if (null != config) {
- entries.addAll(config.getEntries());
- } else {
- throw new MapperConfigException("Unable to generate configuration for different domains.");
- }
- }
-
- /**
- * Evaluates the passed in {@code Evaluation} instance and return boolean
- * result.
- *
- * @param evaluation
- * evaluation instance
- * @param actualObj
- * Json node with values to compare with
- * @return true if matches evaluation else false
- * @throws MapperConfigException
- * error in evaluation
- */
- public static boolean evaluateEntryMatch(Evaluation evaluation, JsonNode actualObj) throws MapperConfigException {
- if (null == evaluation) {
- throw new MapperConfigException("Cannot have null evaluation");
- }
- if (null != evaluation.getOperand()) {
-
- if (MapperConfigUtils.isValidEnum(JoinOperator.class, evaluation.getOperand())) {
- // if(JOIN_OPERATOR.contains(evaluation.getOperand())){
- switch (JoinOperator.valueOf(evaluation.getOperand())) {
- case AND:
- return evaluateEntryMatch(evaluation.getLhs(), actualObj)
- && evaluateEntryMatch(evaluation.getRhs(), actualObj);
- case OR:
- return evaluateEntryMatch(evaluation.getLhs(), actualObj)
- || evaluateEntryMatch(evaluation.getRhs(), actualObj);
- default:
- break;
- }
- }
-
- if (MapperConfigUtils.isValidEnum(ExpressionOperator.class, evaluation.getOperand())) {
- // if(EXPR_OPERATOR.contains(evaluation.getOperand())){
-
- // currently it is assumed field being compared is first level
- // child of incoming JSON structure.
- // If needed, can write a JsonPath implementation later
- String field = evaluation.getField();
- if (null != field && null != evaluation.getDatatype() && actualObj.has(field)) {
- switch (ExpressionOperator.valueOf(evaluation.getOperand())) {
- case EQUALS:
- if (MapperConfigUtils.isValidEnum(DataType.class, evaluation.getDatatype())) {
- switch (DataType.valueOf(evaluation.getDatatype())) {
- case STRING:
- if (null != actualObj.get(field))
- return actualObj.get(field).asText().equals(evaluation.getValue());
- break;
- case DOUBLE:
- if (null != actualObj.get(field))
- return actualObj.get(field).asDouble() == Double
- .valueOf(evaluation.getValue());
- break;
- default:
- return false;
- }
- } else
- return false;
- break;
- case STARTSWITH:
- if (MapperConfigUtils.isValidEnum(DataType.class, evaluation.getDatatype())) {
- switch (DataType.valueOf(evaluation.getDatatype())) {
- case STRING:
- if (null != actualObj.get(field))
- return actualObj.get(field).asText().startsWith(evaluation.getValue());
- break;
- default:
- return false;
- }
- } else
- return false;
- break;
- case ENDSWITH:
- if (MapperConfigUtils.isValidEnum(DataType.class, evaluation.getDatatype())) {
- switch (DataType.valueOf(evaluation.getDatatype())) {
- case STRING:
- if (null != actualObj.get(field))
- return actualObj.get(field).asText().endsWith(evaluation.getValue());
- break;
- default:
- return false;
- }
- } else
- return false;
- break;
- case CONTAINS:
- if (MapperConfigUtils.isValidEnum(DataType.class, evaluation.getDatatype())) {
- switch (DataType.valueOf(evaluation.getDatatype())) {
- case STRING:
- if (null != actualObj.get(field))
- return actualObj.get(field).asText().contains(evaluation.getValue());
- break;
- default:
- return false;
- }
- } else
- return false;
- break;
- default:
- return false;
- }
- }
- }
- } else
- throw new MapperConfigException("Not an expected operand as per config for " + evaluation.getField());
-
- return false;
- }
- /**
- * <p>Checks if the specified name is a valid enum for the class.</p>
- *
- * <p>This method differs from {@link Enum#valueOf} in that checks if the name is
- * a valid enum without needing to catch the exception.</p>
- *
- * @param <E> the type of the enumeration
- * @param enumClass the class of the enum to query, not null
- * @param enumName the enum name, null returns false
- * @return true if the enum name is valid, otherwise false
- */
-
- public static <E extends Enum<E>> boolean isValidEnum(final Class<E> enumClass, final String enumName) {
- if (enumName == null) {
- return false;
- }
- try {
- Enum.valueOf(enumClass, enumName);
- return true;
- } catch (final IllegalArgumentException ex) {
- return false;
- }
- }
-
-
-}