aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java8
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java129
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java24
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java187
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java255
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java337
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java56
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java42
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java7
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java37
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java25
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java68
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java383
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java93
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java116
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java91
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java48
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java220
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java45
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java126
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java50
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java60
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java56
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java46
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java128
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java53
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java79
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java57
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java44
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java3
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java29
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java3
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java4
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java19
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java6
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java41
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java290
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java52
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java158
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java185
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java288
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java62
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java12
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java14
-rw-r--r--UniversalVesAdapter/src/main/resources/DMaapMR.properties45
-rw-r--r--UniversalVesAdapter/src/main/resources/application.properties23
-rw-r--r--UniversalVesAdapter/src/main/resources/dme2/consumer.properties61
-rw-r--r--UniversalVesAdapter/src/main/resources/dme2/preferredRoute.properties5
-rw-r--r--UniversalVesAdapter/src/main/resources/dme2/producer.properties59
-rw-r--r--UniversalVesAdapter/src/main/resources/logback.xml79
-rw-r--r--UniversalVesAdapter/src/main/resources/snmptovesTest.xml54
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalFieldTest.java5
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalMeasurementTest.java2
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalObjectTest.java2
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalParameterTest.java10
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/adapter/UniversalEventAdapterTest.java207
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfigurationTest.java53
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DiskRepoConfigurationTest.java51
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/UniversalEventConfigurationTest.java48
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/controller/VesControllerTest.java2
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/domain/ConfigFileDataTest.java40
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileReadExceptionTest.java34
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionExceptionTest.java2
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/AdapterServiceTest.java147
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DMaapServiceTest.java3
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DiskRepoConfigFileServiceTest.java3
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/MongoDbConfigFileServiceTest.java58
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/VesServiceTest.java9
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/MapperConfigUtilsTest.java13
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/ParallelTasksTest.java72
-rw-r--r--UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/SmooksUtilsTest.java19
74 files changed, 3596 insertions, 1807 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
index fa3b89c..95b31fb 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
@@ -19,7 +19,7 @@
*/
package org.onap.universalvesadapter.adapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
+
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
@@ -32,7 +32,6 @@ import org.onap.universalvesadapter.exception.VesException;
*/
public interface GenericAdapter {
-// String transform(String incomingJsonString) throws ConfigFileReadException;
/**
* It will take in an incoming json and identify the json type for different
@@ -42,10 +41,9 @@ public interface GenericAdapter {
* @param incomingJsonString json that is received on DMaap topic
* @param eventType type identified from incoming json
* @return VES format json
- * @throws ConfigFileReadException if unable to read the configuration file
* @throws ConfigFileSmooksConversionException if unable to convert config file data to smooks object
- * @throws VesException if unable to convert into ves json
+ *
*/
- String transform(String incomingJsonString, String eventType) throws ConfigFileReadException, ConfigFileSmooksConversionException, VesException;
+ String transform(String incomingJsonString, String eventType) throws ConfigFileSmooksConversionException, VesException;
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
index 318c8cd..65c7b9c 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
@@ -24,26 +24,25 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-
import org.milyn.Smooks;
import org.onap.dcaegen2.ves.domain.VesEvent;
-import org.onap.universalvesadapter.configs.UniversalEventConfiguration;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.service.ConfigFileService;
+import org.onap.universalvesadapter.service.VESAdapterInitializer;
import org.onap.universalvesadapter.utils.SmooksUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.SAXException;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSyntaxException;
/**
* Default implementation of the Generic Adapter
@@ -51,86 +50,88 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author kmalbari
*
*/
+
@Component
-public class UniversalEventAdapter implements GenericAdapter{
-
+public class UniversalEventAdapter implements GenericAdapter {
+
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private UniversalEventConfiguration configuration;
-
- @Resource(name="diskRepoConfigFileService")
- private ConfigFileService configFileService;
-
-// private Smooks smooks;
-
+
+ private String enterpriseId;
+ @Value("${defaultMappingFileName}")
+ private String defaulMappingFileName;
private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>();
- /*public String transform(String incomingJsonString) throws ConfigFileReadException {
- String result = "";
- try {
- //reading config file.. for now, looking at it as just one time operation
- if(null == smooks){
- String configFileData = configFileService.readConfigFile(configuration.getConfigFile());
- smooks = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
- }
-
- VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooks, incomingJsonString);
- ObjectMapper objectMapper = new ObjectMapper();
- result = objectMapper.writeValueAsString(vesEvent);
- } catch (IOException | SAXException e) {
- e.printStackTrace();
- }
-
- return result;
- }*/
-
+ public UniversalEventAdapter() {
+ }
+ /**
+ * transforms JSON to VES format and and returns the ves Event
+ *
+ * @param IncomingJason,eventType
+ * @return ves Event
+ */
@Override
- public String transform(String incomingJsonString, String eventType) throws ConfigFileReadException,
- ConfigFileSmooksConversionException, VesException {
+ public String transform(String incomingJsonString, String eventType)
+ throws ConfigFileSmooksConversionException, VesException {
String result = "";
+ String configFileData;
try {
- if(null == eventToSmooksMapping.get(eventType)){
- LOGGER.debug("No smooks mapping for this event type " + eventType + ".. reading config file");
- String configFileData = configFileService.readConfigFile(configuration.getConfigForEvent(eventType));
- LOGGER.debug("Read config file " + configFileData);
+
+ Gson gson = new Gson();
+ JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class);
+ JsonElement results = body.get("notify OID");
+ String notifyOid = results.getAsString();
+
+ // extracting enterprise id from notify OID of SNMP trap.
+ enterpriseId = notifyOid.substring(0, notifyOid.length() - 4);
+
+ if (VESAdapterInitializer.getMappingFiles().containsKey(enterpriseId)) {
+ configFileData = VESAdapterInitializer.getMappingFiles().get(enterpriseId);
+ LOGGER.debug("Using Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId);
+ } else {
+
+ configFileData = VESAdapterInitializer.getMappingFiles().get(defaulMappingFileName);
+ LOGGER.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId);
+ }
+
Smooks smooksTemp = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
eventToSmooksMapping.put(eventType, smooksTemp);
- LOGGER.debug("Added smooks mapping for event type" + eventType);
- }
-
-
- LOGGER.debug("Read smooks mapping for event type" + eventType);
- LOGGER.debug("Transforming incoming json now");
- VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(eventToSmooksMapping.get(eventType), incomingJsonString);
+
+ VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,incomingJsonString);
LOGGER.debug("Incoming json transformed to VES format successfully");
ObjectMapper objectMapper = new ObjectMapper();
- result = objectMapper.writeValueAsString(vesEvent);
+ result = objectMapper.writeValueAsString(vesEvent);
LOGGER.debug("Serialized VES json");
} catch (JsonProcessingException exception) {
- throw new VesException("Unable to convert pojo to VES format" + "\n Reason :" + exception.getMessage());
+ throw new VesException("Unable to convert pojo to VES format, Reason :{}", exception);
} catch (SAXException | IOException exception) {
- throw new ConfigFileSmooksConversionException("Unable to convert config file into smooks for event type " + eventType
- + "\n Reason :" + exception.getMessage());
+ //Invalid Mapping file
+ LOGGER.error("Dropping this Trap :{},due to error Occured :Reason:", incomingJsonString, exception);
+
+ } catch (JsonSyntaxException exception) {
+ // Invalid Trap
+ LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception);
+ }catch (JsonParseException exception) {
+ // Invalid Trap
+ LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception);
+ }
+ catch (RuntimeException exception) {
+
+ LOGGER.error("Dropping this Trap :{},Reason:", incomingJsonString, exception);
+
}
return result;
}
-
-
+
/**
* Closes all open smooks' instances before bean is destroyed
*/
@PreDestroy
- public void destroy(){
-// if(null != smooks)
-// smooks.close();
-
- for(Smooks smooks : eventToSmooksMapping.values())
+ public void destroy() {
+ for (Smooks smooks : eventToSmooksMapping.values())
smooks.close();
-
LOGGER.debug("All Smooks objects closed");
- }
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java
deleted file mode 100644
index e47af70..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-public abstract class Configuration {
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java
new file mode 100644
index 0000000..fd4185b
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java
@@ -0,0 +1,187 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.configs;
+
+
+
+import java.util.Locale;
+
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Objects;
+
+
+/**
+ * <p>
+ * Contains common parameters for both DMaaP Message Router Publisher and Subscriber Configs
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ */
+@Component
+public abstract class DMaaPMRBaseConfig {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DMaaPMRBaseConfig.class);
+
+ protected String hostName;
+ protected Integer portNumber;
+ protected String topicName;
+ protected String protocol;
+ protected String userName;
+ protected String userPassword;
+ protected String contentType;
+
+ /**
+ * Provides host name e.g. mrlocal-mtnjftle01.homer.com
+ *
+ * @return host name
+ */
+ public String getHostName() {
+ return hostName;
+ }
+
+
+ /**
+ * Provides Port Number of DMaaP MR Topic Host. Defaults to 80
+ *
+ * @return host port number
+ */
+ public Integer getPortNumber() {
+ return portNumber;
+ }
+
+ /**
+ * Provides topic name e.g. com.dcae.dmaap.mtnje2.DcaeTestVES
+ *
+ * @return topic name
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * Provides protocol type e.g. http or https
+ *
+ * @return protocol type
+ */
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * Provides content type e.g. application/json
+ *
+ * @return content type
+ */
+ public String getContentType() {
+ return contentType;
+ }
+
+
+ /**
+ * Provides User name for the DMaaP MR Topic authentication
+ *
+ * @return user name
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * Provides User password for the DMaaP MR Topic authentication
+ *
+ * @return user Password
+ */
+ public String getUserPassword() {
+ return userPassword;
+ }
+
+
+ /**
+ * Trims, adjusts casing and validates user input String for protocol selection
+ *
+ * @param protocol - User input for protocol String
+ * @return - network protocol e.g http or https
+ */
+ protected static String normalizeValidateProtocol(final String protocol) {
+ // validate that only http and https are supported protocols are Supported for DMaaP MR
+ String normalizedProtocolString = protocol.trim().toLowerCase(Locale.ENGLISH);
+ if (normalizedProtocolString.isEmpty() ||
+ !("http".equals(normalizedProtocolString) || "https".equals(normalizedProtocolString))) {
+
+ final String errorMessage =
+ "Unsupported protocol selection. Only HTTPS and HTTPS are currently supported for DMaaP MR";
+
+ throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return normalizedProtocolString;
+ }
+
+
+ /**
+ * Trims, adjust casing and validates content type is supported by DMaaP.
+ *
+ * NOTE: DMaaP currently only support application/json content type
+ *
+ * @param contentType content type that needs to checked for DMaaP MR support
+ * @return true if content type is supported by DMaaP MR
+ */
+ protected static String normalizeValidateContentType(final String contentType) {
+ // Current DMaaP MR is only supporting "application/json" content type
+ String normalizedContentType = contentType.trim().toLowerCase(Locale.ENGLISH);
+ final boolean isSupported = contentType.equals(HTTPUtils.JSON_APPLICATION_TYPE);
+ if (!isSupported) {
+ final String errorMessage =
+ "Unsupported content type selection. Only application/json is currently supported for DMaaP MR";
+
+ throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return normalizedContentType;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DMaaPMRBaseConfig)) {
+ return false;
+ }
+ DMaaPMRBaseConfig that = (DMaaPMRBaseConfig) o;
+ return Objects.equal(hostName, that.hostName) &&
+ Objects.equal(portNumber, that.portNumber) &&
+ Objects.equal(topicName, that.topicName) &&
+ Objects.equal(protocol, that.protocol) &&
+ Objects.equal(userName, that.userName) &&
+ Objects.equal(userPassword, that.userPassword) &&
+ Objects.equal(contentType, that.contentType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(hostName, portNumber, topicName, protocol, userName, userPassword, contentType);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
new file mode 100644
index 0000000..70fcf58
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
@@ -0,0 +1,255 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+
+//TO-DO Lisence Issue ASK to Kedar???????????
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for DMaaP MR Publisher.
+ * <p>
+ * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber
+ * Configuration
+ * </p>
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ *
+ */
+@ComponentScan
+public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig {
+
+
+ /**
+ * Publisher batching queue size
+ */
+ private int maxBatchSize;
+
+ /**
+ * Publisher Recovery Queue Size
+ */
+ private int maxRecoveryQueueSize;
+
+ /**
+ * Default uri path prefix
+ */
+ private String dmaapUriPathPrefix ;
+
+ private DMaaPMRPublisherConfig(@Nonnull String hostName, @Nonnull Integer portNumber, @Nonnull String topicName,
+ @Nonnull String protocol, String userName, String userPassword, @Nonnull String contentType,
+ int maxBatchSize, int maxRecoveryQueueSize,String dmaapUriPathPrefix) {
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+ this.topicName = topicName;
+ this.protocol = protocol;
+ this.userName = userName;
+ this.userPassword = userPassword;
+ this.contentType = contentType;
+ this.maxBatchSize = maxBatchSize;
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ this.dmaapUriPathPrefix =dmaapUriPathPrefix;
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private int maxBatchSize;
+ private int maxRecoveryQueueSize;
+ private String dmaapUriPathPrefix ;
+
+
+
+ public Builder(@Nonnull String hostName, @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+ this.hostName = hostName;
+ this.topicName = topicName;
+ // Default values
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol =dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+
+ this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE();
+ this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE();
+ this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+ }
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber
+ * custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication
+ * will be disabled
+ *
+ * @param userName
+ * user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ /**
+ * Setup user password for authentication. If no password is provided
+ * authentication will be disabled
+ *
+ * @param userPassword
+ * user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher protocol - Defaults to https. Note: Only http and
+ * https are currently supported.
+ *
+ * @param protocol
+ * protocol e.g. https
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher content-type - Defaults to application/json
+ *
+ * @param contentType
+ * content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher Max Batch Size - Defaults to 100
+ *
+ * @param maxBatchSize
+ * max Batch Size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ /**
+ * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold
+ * messages temporarily in case DMaaP MR Publisher topic is not responding for
+ * any reason. Defaults to 100,000
+ *
+ * @param maxRecoveryQueueSize
+ * max recovery queue size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) {
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ return this;
+ }
+
+
+ /**
+ * Creates immutable instance of {@link DMaaPMRPublisherConfig}
+ *
+ * @return Builds and returns thread safe, immutable
+ * {@link DMaaPMRPublisherConfig} object
+ */
+ public DMaaPMRPublisherConfig build() {
+ return new DMaaPMRPublisherConfig(hostName, portNumber, topicName, protocol, userName, userPassword,
+ contentType, maxBatchSize, maxRecoveryQueueSize,dmaapUriPathPrefix);
+ }
+
+ }
+ public String getDmaapUriPathPrefix() {
+ return dmaapUriPathPrefix;
+ }
+
+ /**
+ * Returns max Publisher Batch Queue Size
+ *
+ * @return max Publisher Batch Queue size
+ */
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * Returns max Publisher Recovery Queue Size
+ *
+ * @return max Recovery Queue size
+ */
+ public int getMaxRecoveryQueueSize() {
+ return maxRecoveryQueueSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o;
+ return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
new file mode 100644
index 0000000..2b8158a
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
@@ -0,0 +1,337 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.onap.universalvesadapter.utils.DmaapConfig;
+
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for Subscriber.
+ * <p>
+ * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ */
+public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig {
+
+ private final String consumerId;
+ private final String consumerGroup;
+ private final int timeoutMS;
+ private final int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPrefix;
+
+ private DMaaPMRSubscriberConfig(@Nonnull String hostName,
+ @Nonnull Integer portNumber,
+ @Nonnull String topicName,
+ @Nonnull String protocol,
+ String userName,
+ String userPassword,
+ @Nonnull String contentType,
+ @Nonnull String consumerId,
+ @Nonnull String consumerGroup,
+ @Nonnull int timeoutMS,
+ @Nonnull int messageLimit,
+ String timeoutMSParam,
+ String messageLimitParam,
+ String uriPrefix) {
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+ this.topicName = topicName;
+ this.protocol = protocol;
+ this.userName = userName;
+ this.userPassword = userPassword;
+ this.contentType = contentType;
+ this.consumerId = consumerId;
+ this.consumerGroup = consumerGroup;
+ this.timeoutMS = timeoutMS;
+ this.messageLimit = messageLimit;
+ this.timeoutMSParam=timeoutMSParam;
+ this.messageLimitParam=messageLimitParam;
+ this.uriPrefix=uriPrefix;
+
+
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private String consumerId;
+ private String consumerGroup;
+ private int timeoutMS;
+ private int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPreifix;
+
+
+
+ public Builder(@Nonnull String hostName,
+ @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+
+ // Required Values
+ this.hostName = hostName;
+ this.topicName = topicName;
+ // Default values
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol = dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+ this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID();
+ this.consumerGroup = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
+ this.timeoutMS =dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
+ this.messageLimit = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
+ this.timeoutMSParam=dmaapConfig.getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
+ this.messageLimitParam=dmaapConfig.getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
+ this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+
+ }
+
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication will be disabled
+ *
+ * @param userName user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+
+ /**
+ * Setup user password for authentication. If no password is provided authentication will be disabled
+ *
+ * @param userPassword user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Subscriber protocol - Defaults to https.
+ * Note: Only http and https are currently supported.
+ *
+ * @param protocol protocol e.g. https or http
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber content-type - Defaults to application/json
+ *
+ * @param contentType content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Consumer Id - Defaults to random Id
+ *
+ * @param consumerId - custom consumer ID
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerId(@Nonnull String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ /**
+ * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID
+ *
+ * @param consumerGroup - custom Consumer Group
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerGroup(@Nonnull String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ /**
+ * Setup Custom Subscriber timeout in ms - Default to no timeout limit
+ *
+ * @param timeoutMS timeout in milliseconds
+ * @return Builder object itself for chaining
+ */
+ public Builder setTimeoutMS(@Nonnull int timeoutMS) {
+ this.timeoutMS = timeoutMS;
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber Message Limit - Default to no limit
+ *
+ * @param messageLimit message Limit
+ * @return Builder object itself for chaining
+ */
+ public Builder setMessageLimit(@Nonnull int messageLimit) {
+ this.messageLimit = messageLimit;
+ return this;
+ }
+
+ /**
+ * Builds Immutable instance of {@link DMaaPMRSubscriberConfig}
+ *
+ * @return immutable DMaaP Subscriber Config Object
+ * @throws IOException
+ */
+ public DMaaPMRSubscriberConfig build() throws IOException {
+ return new DMaaPMRSubscriberConfig(hostName, portNumber, topicName, protocol, userName, userPassword,
+ contentType, consumerId, consumerGroup, timeoutMS, messageLimit, timeoutMSParam, messageLimitParam,
+ uriPreifix);
+ }
+
+ }
+
+
+ public String getTimeoutMSParam() {
+ return timeoutMSParam;
+ }
+
+ public void setTimeoutMSParam(String timeoutMSParam) {
+ this.timeoutMSParam = timeoutMSParam;
+ }
+
+ public String getMessageLimitParam() {
+ return messageLimitParam;
+ }
+
+ public void setMessageLimitParam(String messageLimitParam) {
+ this.messageLimitParam = messageLimitParam;
+ }
+
+ public String getUriPrefix() {
+ return uriPrefix;
+ }
+
+ public void setUriPrefix(String uriPreifix) {
+ this.uriPrefix = uriPreifix;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Id
+ *
+ * @return consumer Id
+ */
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Group
+ *
+ * @return consumer group
+ */
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * DMaaP MR Subscriber Timeout in ms
+ *
+ * @return subscriber timeout ms
+ */
+ public int getTimeoutMS() {
+ return timeoutMS;
+ }
+
+ /**
+ * DMaaP MR Subscriber message limit
+ *
+ * @return subscriber message limit
+ */
+ public int getMessageLimit() {
+ return messageLimit;
+ }
+
+ /**
+ * DMaaP property file
+ *
+ * @return Properties
+ */
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o;
+ return Objects.equal(consumerId, that.consumerId) &&
+ Objects.equal(consumerGroup, that.consumerGroup) &&
+ Objects.equal(timeoutMS, that.timeoutMS) &&
+ Objects.equal(messageLimit, that.messageLimit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java
deleted file mode 100644
index 2b9a820..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * Configuration for Dmaap MR Service
- *
- * @author kmalbari
- *
- */
-@Component
-public class DMaapMrUrlConfiguration extends Configuration {
-
- @Value("${dmaap.url}")
- private String url;
-
- @Value("${dmaap.consumer_props}")
- private String consumerProperties;
-
- @Value("${dmaap.publisher_props}")
- private String publisherProperties;
-
- public String getPublisherProperties() {
- return publisherProperties;
- }
-
- public String getConsumerProperties() {
- return consumerProperties;
- }
-
- public String getUrl() {
- return url;
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java
deleted file mode 100644
index b1daf0d..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- *
- * Configuration for disk repository service
- *
- * @author kmalbari
- *
- */
-@Component
-public class DiskRepoConfiguration extends Configuration {
-
- @Value("${fileService.url}")
- private String fileRepositoryUrl;
-
- public String getFileRepositoryUrl() {
- return fileRepositoryUrl;
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
index 3edca56..6f85ef3 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
@@ -33,12 +33,9 @@ import org.springframework.stereotype.Component;
*
*/
@Component
-public class UniversalEventConfiguration extends Configuration {
+public class UniversalEventConfiguration{
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Value("${snmpTrap.configFile}")
- private String configFile;
@Value("${universal.configFiles}")
private String configFiles;
@@ -68,6 +65,4 @@ public class UniversalEventConfiguration extends Configuration {
}
- //think about adding mapping files on runtime as well
-
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java
deleted file mode 100644
index e844270..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.controller;
-
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
-
-
-@RestController
-public class MockDmaapController {
-
- @RequestMapping("/greeting")
- public String greeting(@RequestParam(value="name", defaultValue="World") String name) {
-// return new Greeting(counter.incrementAndGet(),
-// String.format(template, name));
- return "{ \"protocol version\": \"v2c\", \"notify OID\": \".1.3.6.1.4.1.74.2.46.12.1.1\", \"cambria.partition\": \"dcae-snmp.client.research.att.com\" , \"trap category\": \"UCSNMP-HEARTBEAT\", \"epoch_serno\": 15161177410000, \"community\" : \"public\", \"time received\": 1516117741, \"agent name\": \"localhost\", \"agent address\": \"127.0.0.1\", \"community len\": 6, \"notify OID len\": 12, \"varbinds\": [ { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.1\", \"varbind_value\": \"ucsnmp heartbeat - ignore\" }, { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.2\", \"varbind_value\": \"Tue Jan 16 10:49:01 EST 2018\" } ] }";
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
index 588c912..dfddfde 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
@@ -19,6 +19,8 @@
*/
package org.onap.universalvesadapter.controller;
+import org.onap.universalvesadapter.exception.MapperConfigException;
+import org.onap.universalvesadapter.service.VESAdapterInitializer;
import org.onap.universalvesadapter.service.VesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController;
*
* @author kmalbari
*/
+
@RestController
public class VesController {
@@ -40,17 +43,35 @@ public class VesController {
@Autowired
private VesService vesService;
+ @Autowired
+ private VESAdapterInitializer vESAdapterInitializer;
+
/**
* @return message that application is started
*/
@RequestMapping("/start")
public String start() {
- LOGGER.debug("UniversalVesAdapter Application starting...");
- vesService.start();
+ LOGGER.info("UniversalVesAdapter Application starting...");
+
+ try {
+ vesService.start();
+ } catch (MapperConfigException e) {
+
+ LOGGER.error("Config error:{}",e.getMessage(),e.getCause());
+ }
return "Application started";
}
+ @RequestMapping("/reload")
+ public void reloadMappingFileFromDB() {
+ LOGGER.debug("Reload of Mapping File is started");
+ vESAdapterInitializer.fetchMappingFile();
+ LOGGER.debug("Reload of Mapping File is completed");
+ }
+
+
+
/**
* @return message that application stop process is triggered
*/
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java
new file mode 100644
index 0000000..efd0b0d
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java
@@ -0,0 +1,68 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherImpl;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueFactory;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueImpl;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberImpl;
+import org.springframework.stereotype.Component;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+
+
+/**
+ * Guice Module to wire concrete implementations with interfaces
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+@Component
+public class AnalyticsDMaaPModule extends AbstractModule {
+
+
+ @Override
+ protected void configure() {
+
+ // Bind Http Client
+ bind(CloseableHttpClient.class).toInstance(HttpClients.createDefault());
+
+ // Bind Publishing queue
+ install(new FactoryModuleBuilder().implement(DMaaPMRPublisherQueue.class, DMaaPMRPublisherQueueImpl.class)
+ .build(DMaaPMRPublisherQueueFactory.class));
+
+ install(new FactoryModuleBuilder().implement(DMaaPMRPublisher.class, DMaaPMRPublisherImpl.class)
+ .build(DMaaPMRPublisherFactory.class));
+
+ install(new FactoryModuleBuilder().implement(DMaaPMRSubscriber.class, DMaaPMRSubscriberImpl.class)
+ .build(DMaaPMRSubscriberFactory.class));
+
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java
new file mode 100644
index 0000000..0638574
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java
@@ -0,0 +1,383 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+
+import static java.lang.String.format;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponse;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponseImpl;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponse;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponseImpl;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.ComponentScan;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+
+/**
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+@ComponentScan
+public abstract class BaseDMaaPMRComponent {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ public BaseDMaaPMRComponent() {}
+
+
+ /**
+ * Creates Base64 encoded Auth Header for given userName and Password
+ * If either user name of password are null return absent
+ *
+ * @param userName username
+ * @param userPassword user password
+ * @return base64 encoded auth header if username or password are both non null
+ */
+ protected static Optional<String> getAuthHeader(@Nullable final String userName,
+ @Nullable final String userPassword) {
+ if (userName == null || userPassword == null) {
+ return Optional.absent();
+ } else {
+ final String auth = userName + ":" + userPassword;
+ final Charset isoCharset = Charset.forName("ISO-8859-1");
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
+ return Optional.of("Basic " + new String(encodedAuth, isoCharset));
+ }
+ }
+
+
+ /**
+ * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
+ *
+ * @param publisherConfig publisher settings
+ *
+ * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
+ */
+ protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
+ final String hostName = publisherConfig.getHostName();
+ final Integer portNumber = publisherConfig.getPortNumber();
+ final String getProtocol = publisherConfig.getProtocol();
+ final String topicName = publisherConfig.getTopicName();
+ final String dmaapUriPathPrefix =publisherConfig.getDmaapUriPathPrefix();
+ URI publisherURI = null;
+ try {
+ publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(dmaapUriPathPrefix + topicName).build();
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating publisher URI: %s", e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
+ return publisherURI;
+ }
+
+
+ /**
+ * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
+ *
+ * @param subscriberConfig subscriber settings
+ *
+ * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
+ */
+ protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
+ final String hostName = subscriberConfig.getHostName();
+ final Integer portNumber = subscriberConfig.getPortNumber();
+ final String getProtocol = subscriberConfig.getProtocol();
+ final String topicName = subscriberConfig.getTopicName();
+ final String consumerId = subscriberConfig.getConsumerId();
+ final String consumerGroup = subscriberConfig.getConsumerGroup();
+ final Integer timeoutMS = subscriberConfig.getTimeoutMS();
+ final Integer messageLimit = subscriberConfig.getMessageLimit();
+ URI subscriberURI = null;
+
+ try {
+ URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(subscriberConfig.getUriPrefix()
+ + topicName + "/"
+ + consumerGroup + "/" +
+ consumerId);
+ // add query params if present
+ if (timeoutMS > 0) {
+ uriBuilder.addParameter(subscriberConfig.getTimeoutMSParam(), timeoutMS.toString());
+ }
+ if (messageLimit > 0) {
+ uriBuilder.addParameter(subscriberConfig.getMessageLimitParam(),
+ messageLimit.toString());
+ }
+ subscriberURI = uriBuilder.build();
+
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating subscriber URI: %s", e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
+ return subscriberURI;
+ }
+
+
+ /**
+ * Creates 202 (Accepted) Response code message
+ *
+ * @param batchQueueSize batch Queue size
+ *
+ * @return response with 202 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
+ return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
+ "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
+ }
+
+
+ /**
+ * Creates 204 (No Content) Response code message
+ *
+ * @return response with 204 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
+ return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
+ "No Content - No Messages in batch queue for flushing to MR Topic", 0);
+ }
+
+
+ /**
+ * Creates Publisher Response for given response code, response Message and pending Message Count
+ *
+ * @param responseCode HTTP Status Code
+ * @param responseMessage response message
+ * @param pendingMessages pending messages in batch queue
+ *
+ * @return DMaaP MR Publisher Response
+ */
+ protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
+ responseMessage, int pendingMessages) {
+ return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
+ }
+
+
+ /**
+ * Returns weekly consistent pending messages in batch queue
+ *
+ * @param publisherQueue batch queue
+ * @param publisherConfig publisher settings
+ *
+ * @return pending messages to be published
+ */
+ protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
+ @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
+ return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
+ }
+
+
+ /**
+ * Creates Subscriber Response for give response Code, response Message and fetch messages
+ *
+ * @param responseCode response Code
+ * @param responseMessage response Message
+ * @param fetchedMessages fetched messages
+ *
+ * @return DMaaP MR Subscriber Response
+ */
+ protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
+ responseMessage, List<String> fetchedMessages) {
+ if (fetchedMessages == null) {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
+ } else {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
+ }
+ }
+
+
+ /**
+ * Custom response handler which extract status code and response body
+ *
+ * @return Pair containing Response code and response body
+ */
+ protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
+ return new ResponseHandler<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
+ // Get Response status code
+ final int status = response.getStatusLine().getStatusCode();
+ final HttpEntity responseEntity = response.getEntity();
+ // If response entity is not null - extract response body as string
+ String responseEntityString = "";
+ if (responseEntity != null) {
+ responseEntityString = EntityUtils.toString(responseEntity);
+ }
+ return new ImmutablePair<>(status, responseEntityString);
+ }
+ };
+ }
+
+
+ /**
+ * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
+ * be lost
+ *
+ * @param publisherQueue publisher queue
+ * @param messages recoverable messages to be published to recovery queue
+ */
+ protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
+ List<String> messages) {
+ try {
+ publisherQueue.addRecoverableMessages(messages);
+
+ LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
+ messages.size(), publisherQueue.getBatchQueueRemainingSize());
+
+ } catch (IllegalStateException e) {
+ final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
+ "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
+ messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ }
+
+
+
+
+
+ /**
+ * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
+ *
+ * @param messages messages that need to parsed to Json Array representation
+ * @return json string representation of message
+ */
+ protected static String convertToJsonString(@Nullable final List<String> messages) {
+ // If messages are null or empty just return empty array
+ if (messages == null || messages.isEmpty()) {
+ return "[]";
+ }
+
+
+ List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
+
+ try {
+ for (String message : messages) {
+ final JsonNode jsonNode = objectMapper.readTree(message);
+ jsonMessageObjectsList.add(jsonNode);
+ }
+ return objectMapper.writeValueAsString(jsonMessageObjectsList);
+ } catch (JsonProcessingException e) {
+ final String errorMessage =
+ format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DMaapException(errorMessage, LOG, e);
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts subscriber messages json string to List of messages. If message Json String is empty
+ * or null
+ *
+ * @param messagesJsonString json messages String
+ *
+ * @return List containing DMaaP MR Messages
+ */
+ protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+
+ final LinkedList<String> messages = new LinkedList<>();
+
+ // If message string is not null or not empty parse json message array to List of string messages
+ if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
+ && !("[]").equals(messagesJsonString.trim())) {
+
+ try {
+ // get root node
+ final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+ // iterate over root node and parse arrays messages
+ for (JsonNode jsonNode : rootNode) {
+ // if array parse it is array of messages
+ final String incomingMessageString = jsonNode.toString();
+ if (jsonNode.isArray()) {
+ final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+ for (Object message : messageList) {
+ final String jsonMessageString = objectMapper.writeValueAsString(message);
+ addUnescapedJsonToMessage(messages, jsonMessageString);
+ }
+ } else {
+ // parse it as object
+ addUnescapedJsonToMessage(messages, incomingMessageString);
+ }
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
+ " Json Error: %s", messagesJsonString, e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+ return messages;
+ }
+
+ /**
+ * Adds unescaped Json messages to given messages list
+ *
+ * @param messages message list in which unescaped messages will be added
+ * @param incomingMessageString incoming message string that may need to be escaped
+ */
+ private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+ if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+ messages.add(StringEscapeUtils.unescapeJava(
+ incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+ } else {
+ messages.add(StringEscapeUtils.unescapeJava(incomingMessageString));
+ }
+ }
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
new file mode 100644
index 0000000..cd76f79
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
@@ -0,0 +1,93 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.dmaap;
+
+import java.io.IOException;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class Creator {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(Creator.class);
+ private DMaaPMRFactory dMaaPMRFactoryInstance;
+ private String hostname;
+ private String publisherTopic;
+ private String subcriberTopic;
+ private DmaapConfig dmaapConfig;
+
+ @Autowired
+ public void setDmaapConfig(DmaapConfig dmaapConfig) {
+ this.dmaapConfig = dmaapConfig;
+ }
+
+ public Creator() {
+
+ }
+
+ // prop initializer
+
+ public void propertyFileInitializer() {
+
+ this.hostname = dmaapConfig.getHostname();
+ this.publisherTopic = dmaapConfig.getPublisherTopic();
+ this.subcriberTopic = dmaapConfig.getSubscriberTopic();
+ this.dMaaPMRFactoryInstance = DMaaPMRFactory.create();
+ LOGGER.info("The Hostname of DMaap is :" + hostname);
+
+
+ }
+
+ // publisher
+ public DMaaPMRPublisher getDMaaPMRPublisher(){
+ propertyFileInitializer();
+ DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null;
+ try {
+ dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(hostname, publisherTopic,dmaapConfig).build();
+ } catch (IOException e) {
+ LOGGER.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause());
+ }
+ return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig);
+ }
+
+ // subscriber
+ public DMaaPMRSubscriber getDMaaPMRSubscriber(){
+ propertyFileInitializer();
+ DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null;
+ try {
+ dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(hostname, subcriberTopic, dmaapConfig).build();
+ } catch (IOException e) {
+
+ LOGGER.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause());
+ }
+
+ return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig);
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java
new file mode 100644
index 0000000..50f214c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java
@@ -0,0 +1,116 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+import javax.annotation.Nonnull;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+
+/**
+ * Creates pre injected implementations for {@link DMaaPMRPublisher} and {@link DMaaPMRSubscriber}
+ * <p>
+ * Usage:
+ * <p>Create an instance of DMaaP MR Factory</p>
+ * <pre>
+ * DMaaPFactory dmaapFactory = DMaaPFactory.initalize()
+ * </pre>
+ * <p>Create a new DMaaP MR Publisher</p>
+ * <pre>
+ * DMaaPMRPublisher publisher = dmaapFactory.createPublisher(publisherConfig)
+ * </pre>
+ * <p>Create new DMaaP MR Subscriber</p>
+ * <pre>
+ * DMaaPMRSubscriber subscriber = dmaapFactory.createSubscriber(subscriberConfig)
+ * </pre>
+ * <p>
+ * <strong>All Clients must use this Factory to initalize DMaaP Message Router Publishers and Subscribers</strong>
+ * </p>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+
+public class DMaaPMRFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRFactory.class);
+
+ private final Injector injector;
+
+ public DMaaPMRFactory(AbstractModule guiceModule) {
+ injector = Guice.createInjector(guiceModule);
+ }
+
+ /**
+ * Returns configured instance of {@link DMaaPMRPublisher}
+ *
+ * @param publisherConfig Publisher Config
+ * @return configured instance of DMaaP MR Publisher
+ */
+ public DMaaPMRPublisher createPublisher(@Nonnull DMaaPMRPublisherConfig publisherConfig) {
+ final DMaaPMRPublisherFactory publisherFactory = injector.getInstance(Key.get(DMaaPMRPublisherFactory.class));
+ LOG.debug("Creating new DMaaP MR Publisher Instance with configuration: {}", publisherConfig);
+ final DMaaPMRPublisher dMaaPMRPublisher = publisherFactory.create(publisherConfig);
+
+ LOG.info("Created new DMaaP MR Publisher Instance. Publisher creation time: {}",
+ dMaaPMRPublisher.getPublisherCreationTime());
+ return dMaaPMRPublisher;
+ }
+
+ /**
+ * Returns configured instance of {@link DMaaPMRSubscriber}
+ *
+ * @param subscriberConfig Subscriber Config
+ * @return configured instance of DMaaP MR Subscriber
+ */
+ public DMaaPMRSubscriber createSubscriber(@Nonnull DMaaPMRSubscriberConfig subscriberConfig) {
+ final DMaaPMRSubscriberFactory subscriberFactory = injector.getInstance(DMaaPMRSubscriberFactory.class);
+ LOG.debug("Creating new DMaaP MR Subscriber Instance with configuration: {}", subscriberConfig);
+ final DMaaPMRSubscriber dMaaPMRSubscriber = subscriberFactory.create(subscriberConfig);
+ LOG.info("Created new DMaaP MR Subscriber Instance. Subscriber creation time: {}",
+ dMaaPMRSubscriber.getSubscriberCreationTime());
+ return dMaaPMRSubscriber;
+ }
+
+ /**
+ * Creates an instance of {@link DMaaPMRFactory}
+ *
+ * @return {@link DMaaPMRFactory} factory instance
+ */
+ public static DMaaPMRFactory create() {
+ final DMaaPMRFactory dMaaPMRFactory = new DMaaPMRFactory(new AnalyticsDMaaPModule());
+ LOG.info("Created new instance of DMaaP MR Factory");
+ return dMaaPMRFactory;
+ }
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java
new file mode 100644
index 0000000..6d867b9
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java
@@ -0,0 +1,91 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * <p>
+ * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisher extends AutoCloseable {
+
+
+ /**
+ * <p>
+ * Adds collection of messages to DMaaP MR Topic Publishing Queue.
+ * <p>
+ * Note: Invoking this method may or may not cause publishing immediately
+ * as publishing in done is batch mode by default. Parameter maxBatchSize
+ * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size.
+ * If the maxBatchSize is reached all message will be published automatically
+ * during subsequent call.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return response which may contain Http Response code 202 (Accepted) as publishing
+ * will proceed when max batch size is reached. Throws {@link DMaapException}
+ * if publishing fails
+ */
+ DMaaPMRPublisherResponse publish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse}
+ * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return DMaaP Message Router Publisher Response. Throws {@link DMaapException}
+ * if force publishing fails
+ *
+ */
+ DMaaPMRPublisherResponse forcePublish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns
+ * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to
+ * be flushed response code 304 (Not Modified) will be returned
+ * </p>
+ *
+ * @return DMaaP Message Router Publisher Response
+ */
+ DMaaPMRPublisherResponse flush();
+
+
+ /**
+ * <p>
+ * Returns the creation time when Publisher instance was created.
+ * <p>
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getPublisherCreationTime();
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java
new file mode 100644
index 0000000..b60c74c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java
@@ -0,0 +1,48 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Publisher instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRPublisherFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher
+ *
+ * @param publisherConfig publisher config
+ *
+ * @return DMaaP MR Publisher instance
+ */
+ DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig);
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java
new file mode 100644
index 0000000..5930f41
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java
@@ -0,0 +1,220 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import static java.lang.String.format;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+
+import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+
+/**
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+@ComponentScan
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
+
+ @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
+ private int publisherMaxFlushRetries;
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
+
+ public int a =2;
+ private final DMaaPMRPublisherConfig publisherConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final DMaaPMRPublisherQueue publisherQueue;
+ private final Date publisherCreationTime;
+ private URI publisherUri;
+
+ @Inject
+ public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
+ DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
+ CloseableHttpClient closeableHttpClient){
+
+ this.publisherConfig = publisherConfig;
+ final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
+ this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
+ maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
+ this.closeableHttpClient = closeableHttpClient;
+ this.publisherUri = createPublisherURI(publisherConfig);
+ this.publisherCreationTime = new Date();
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse publish(List<String> messages) {
+
+ final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
+
+ // if messages size is less than batch queue size - just queue them for batch publishing
+ if (batchQueueRemainingSize > messages.size()) {
+ LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
+ messages.size(), batchQueueRemainingSize);
+ final int batchQueueSize = publisherQueue.addBatchMessages(messages);
+ return createPublisherAcceptedResponse(batchQueueSize);
+
+ } else {
+
+ // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
+ "Publisher Topic.");
+ return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
+ }
+
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
+
+ LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
+
+ final String contentType = publisherConfig.getContentType();
+ final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName();
+ final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword();
+ final HttpPost postRequest = new HttpPost(publisherUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
+ }
+
+ // Create post string entity
+ final String messagesJson = convertToJsonString(messages);
+ final StringEntity requestEntity =
+ new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
+ postRequest.setEntity(requestEntity);
+
+ try {
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+ // if messages were published successfully, return successful response
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
+ "Body: {}, Number of Messages published: {}",
+ responseCode, responseBody, messages.size());
+
+ } else {
+ LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
+ "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ }
+
+ return createPublisherResponse(responseCode, responseBody,
+ getPendingMessages(publisherQueue, publisherConfig));
+
+ } catch (IOException e) {
+ // If IO Error then we need to also put messages in recovery queue
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
+ "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
+
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse flush() {
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ // If there are no message return 204 (No Content) response code
+ if (queueMessages.isEmpty()) {
+ LOG.debug("No messages to publish to batch queue. Returning 204 status code");
+ return createPublisherNoContentResponse();
+ } else {
+ // force publish messages in queue
+ return forcePublish(queueMessages);
+ }
+ }
+
+ @Override
+ public Date getPublisherCreationTime() {
+ return new Date(publisherCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ // flush current message in the queue
+ int retrialNumber = 0;
+ int flushResponseCode;
+
+ // automatic retries if messages cannot be flushed
+ do {
+ retrialNumber++;
+ DMaaPMRPublisherResponse flushResponse = flush();
+ flushResponseCode = flushResponse.getResponseCode();
+
+ if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
+ "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
+ publisherMaxFlushRetries);
+
+ Thread.sleep(publisherMaxFlushRetries);
+ }
+ } while (retrialNumber <= publisherMaxFlushRetries &&
+ !HTTPUtils.isSuccessfulResponseCode(flushResponseCode));
+
+ if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
+ } else {
+ LOG.info("Successfully published all batched messages to publisher.");
+ }
+
+ // close http client
+ closeableHttpClient.close();
+
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java
new file mode 100644
index 0000000..18feec8
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java
@@ -0,0 +1,87 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic
+ * is offline for some reason. It does so by having a recovery queue which keeps
+ * messages in order in case there is temporary interruption in DMaaP Publisher
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueue {
+
+ /**
+ * <p>
+ * Add batchMessages to Batch Queue
+ * </p>
+ *
+ * @param batchMessages messages that needs to be added to batch queue
+ * @return current size of batch queue. Throws {@link IllegalStateException}
+ * if batch queue does not have enough space
+ */
+ int addBatchMessages(List<String> batchMessages);
+
+
+ /**
+ * <p>
+ * Add recoverable messages to Recoverable Queue
+ * </p>
+ *
+ * @param recoverableMessages messages that needs to be added to recoverable queue
+ * @return current size of the recoverable queue. Throws {@link IllegalStateException}
+ * if recoverable queue does not have enough space
+ */
+ int addRecoverableMessages(List<String> recoverableMessages);
+
+ /**
+ * <p>
+ * Get messages that need to be published to DMaaP topic. Messages in recoverable
+ * queue are appended if present.
+ * </p>
+ *
+ * @return List of messages from both batch and recovery queue
+ */
+ List<String> getMessageForPublishing();
+
+ /**
+ * <p>
+ * Remaining capacity of Batch Queue
+ * </p>
+ *
+ * @return Remaining Batch Queue Size
+ */
+ int getBatchQueueRemainingSize();
+
+ /**
+ * <p>
+ * Remaining capacity of Recovery Queue
+ * </p>
+ *
+ * @return Remaining Recovery Queue Size
+ */
+ int getRecoveryQueueRemainingSize();
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java
new file mode 100644
index 0000000..f100210
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java
@@ -0,0 +1,45 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueueFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher Queue
+ *
+ * @param batchQueueSize batch queue size
+ * @param recoveryQueueSize recovery queue size
+ *
+ * @return instance of DMaaP MR Publisher Queue
+ */
+ DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize);
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java
new file mode 100644
index 0000000..fc240d6
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java
@@ -0,0 +1,126 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newLinkedList;
+import static java.util.Collections.unmodifiableList;
+
+/**
+ * <p>
+ * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
+ * for batch and recovery queues
+ * </p>
+ *
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
+
+ private final LinkedBlockingDeque<String> batchQueue;
+ private final LinkedBlockingDeque<String> recoveryQueue;
+
+ @Inject
+ public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize) {
+ batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
+ recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
+ LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
+ batchQueueSize, recoveryQueueSize);
+ }
+
+ @Override
+ public synchronized int addBatchMessages(List<String> batchMessages) {
+
+ // checks if batchMessages size does not exceed batch queue capacity
+ if (batchMessages.size() > batchQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue");
+ }
+
+ // Add batchMessages to batch queue
+ for (String message : batchMessages) {
+ batchQueue.add(message);
+ }
+
+ // returns current elements size in batch queue
+ return batchQueue.size();
+ }
+
+ @Override
+ public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
+
+ // checks if messages size does not exceed recovery queue size
+ if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
+ }
+
+ // add messages to recovery queue
+ for (String recoverableMessage : recoverableMessages) {
+ recoveryQueue.add(recoverableMessage);
+ }
+
+ // returns current size of recovery queue
+ return recoveryQueue.size();
+ }
+
+ @Override
+ public synchronized List<String> getMessageForPublishing() {
+
+ final List<String> recoveryMessageList = new LinkedList<>();
+ final List<String> batchMessagesList = new LinkedList<>();
+
+ // get messages from recovery queue if present
+ if (!recoveryQueue.isEmpty()) {
+ final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
+ LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
+ }
+
+ // get messages from batch queue if present
+ if (!batchQueue.isEmpty()) {
+ final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
+ LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
+ }
+
+ // concat recovery and batch queue elements
+ return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
+ }
+
+ @Override
+ public synchronized int getBatchQueueRemainingSize() {
+ return batchQueue.remainingCapacity();
+ }
+
+ @Override
+ public synchronized int getRecoveryQueueRemainingSize() {
+ return recoveryQueue.remainingCapacity();
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java
new file mode 100644
index 0000000..ee12cc8
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java
@@ -0,0 +1,50 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+/**
+ * <p>
+ * Contract for all DMaaPMR Publisher Response
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisherResponse {
+
+ /**
+ * Gets HTTP Response Code
+ *
+ * @return HTTP Response code as String
+ */
+ Integer getResponseCode();
+
+ /**
+ * Gets Response Message
+ *
+ * @return Response Message
+ */
+ String getResponseMessage();
+ /**
+ * Gets number of pending messages
+ *
+ * @return pending messages in the batch queue
+ */
+ int getPendingMessagesCount();
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java
new file mode 100644
index 0000000..4570f78
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java
@@ -0,0 +1,60 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import javax.annotation.Nonnull;
+
+/**
+ * <p>
+ * An simple implementation of {@link DMaaPMRPublisherResponse}
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRPublisherResponseImpl implements DMaaPMRPublisherResponse {
+
+ private final Integer responseCode;
+ private final String responseMessage;
+ private final int pendingMessagesCount;
+
+ public DMaaPMRPublisherResponseImpl(@Nonnull Integer responseCode, @Nonnull String responseMessage,
+ int pendingMessagesCount) {
+ this.responseCode = responseCode;
+ this.responseMessage = responseMessage;
+ this.pendingMessagesCount = pendingMessagesCount;
+ }
+
+ @Override
+ public Integer getResponseCode() {
+ return responseCode;
+ }
+
+ @Override
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ @Override
+ public int getPendingMessagesCount() {
+ return pendingMessagesCount;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java
new file mode 100644
index 0000000..336e3dd
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java
@@ -0,0 +1,56 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+
+
+import java.util.Date;
+
+/**
+ * <p>
+ * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRSubscriber extends AutoCloseable {
+
+ /**
+ * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters
+ * for messageLimit and message timeout are used
+ *
+ * @return DMaaP Message Router Subscriber Response
+ */
+ DMaaPMRSubscriberResponse fetchMessages();
+
+
+ /**
+ * Returns the Subscriber instance creation time
+ * <p>
+ * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which
+ * are published to the topic after the creation of the Subscriber.
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getSubscriberCreationTime();
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java
new file mode 100644
index 0000000..dcb6f4c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java
@@ -0,0 +1,46 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+
+/**
+ * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Subscriber instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRSubscriberFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Subscriber Instance
+ *
+ * @param subscriberConfig subscriber config
+ *
+ * @return DMaaP MR Subscriber instance
+ */
+ DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig);
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java
new file mode 100644
index 0000000..2e7aac9
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java
@@ -0,0 +1,128 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRSubscriber} which uses
+ * {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
+
+ private final DMaaPMRSubscriberConfig subscriberConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final URI subscriberUri;
+ private final Date subscriberCreationTime;
+
+ @Inject
+ public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
+ CloseableHttpClient closeableHttpClient) {
+ this.subscriberConfig = subscriberConfig;
+ this.closeableHttpClient = closeableHttpClient;
+ this.subscriberUri = createSubscriberURI(subscriberConfig);
+ this.subscriberCreationTime = new Date();
+ }
+
+ @Override
+ public DMaaPMRSubscriberResponse fetchMessages() {
+
+ final String userName = (subscriberConfig.getUserName().equals("null")) ? null : subscriberConfig.getUserName();
+ final String userPassword = (subscriberConfig.getUserPassword().equals("null")) ? null
+ : subscriberConfig.getUserPassword();
+ final HttpGet getRequest = new HttpGet(subscriberUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
+ }
+
+ try {
+
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+
+ List<String> fetchedMessages = new LinkedList<>();
+ String responseMessage = responseBody;
+
+ // if messages were published successfully, return successful response
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ if (responseBody != null) {
+ fetchedMessages = convertJsonToStringMessages(responseBody);
+ responseMessage = "Messages Fetched Successfully";
+ } else {
+ responseMessage = "DMaaP Response Body had no messages";
+ }
+ } else {
+ LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, "
+ + "DMaaP Response Body: {}", responseCode, responseBody);
+ }
+
+ return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
+
+ } catch (IOException e) {
+
+ final String errorMessage = format("IO Exception while fetching messages from DMaaP Topic. Exception %s",
+ e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+
+ @Override
+ public Date getSubscriberCreationTime() {
+ return new Date(subscriberCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeableHttpClient.close();
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java
new file mode 100644
index 0000000..98b6b01
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java
@@ -0,0 +1,53 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import java.util.List;
+
+/**
+ * <p>
+ * Contract for all DMaaP MR Subscriber Responses
+ * </p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRSubscriberResponse {
+
+ /**
+ * Gets HTTP Response Code
+ *
+ * @return HTTP Response code as String
+ */
+ Integer getResponseCode();
+
+ /**
+ * Gets Response Message
+ *
+ * @return Response Message
+ */
+ String getResponseMessage();
+ /**
+ * Returns message fetched from DMaaP MR Topic
+ *
+ * @return collection of actual message retrieved from DMaaP MR Topic
+ */
+ List<String> getFetchedMessages();
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java
new file mode 100644
index 0000000..e318359
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java
@@ -0,0 +1,79 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * <p>
+ * A simple implementation for {@link DMaaPMRSubscriberResponse}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberResponseImpl implements DMaaPMRSubscriberResponse {
+
+ private final Integer responseCode;
+ private final String responseMessage;
+ private final List<String> fetchedMessages;
+
+ public DMaaPMRSubscriberResponseImpl(@Nonnull Integer responseCode,
+ @Nonnull String responseMessage,
+ @Nullable List<String> fetchedMessages) {
+ this.responseCode = responseCode;
+ this.responseMessage = responseMessage;
+ this.fetchedMessages = fetchedMessages != null ? fetchedMessages : ImmutableList.<String>of();
+ }
+
+ public DMaaPMRSubscriberResponseImpl(Integer responseCode, String responseMessage) {
+ this(responseCode, responseMessage, null);
+ }
+
+ @Override
+ public Integer getResponseCode() {
+ return responseCode;
+ }
+
+ @Override
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ @Override
+ public List<String> getFetchedMessages() {
+ return unmodifiableList(fetchedMessages);
+ }
+
+ @Override
+ public String toString()
+ {
+
+ return responseMessage;
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java
deleted file mode 100644
index 796fe70..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.domain;
-
-import org.springframework.data.annotation.Id;
-
-/**
- * A domain wrapper class for saving the config file in Mongo DB
- *
- * @author kmalbari
- *
- */
-public class ConfigFileData {
-
-
- @Id private String id;
-
- private String xmlFileName;
-
- private String xmlContent;
-
- public String getXmlFileName() {
- return xmlFileName;
- }
-
- public void setXmlFileName(String xmlFileName) {
- this.xmlFileName = xmlFileName;
- }
-
- public String getXmlContent() {
- return xmlContent;
- }
-
- public void setXmlContent(String xmlContent) {
- this.xmlContent = xmlContent;
- }
-
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java
deleted file mode 100644
index 625b021..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.exception;
-
-/**
- * Exception when unable to connect to Config file Disk repository
- *
- * @author kmalbari
- *
- */
-public class ConfigFileReadException extends VesException {
-
- /**
- *
- */
- private static final long serialVersionUID = 414953072485703000L;
-
- public ConfigFileReadException(String exceptionMessage) {
- super(exceptionMessage);
- }
-
- public ConfigFileReadException(String exceptionMessage, Exception exception) {
- super(exceptionMessage, exception);
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
index 7055bc0..1f642b2 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
@@ -28,9 +28,6 @@ package org.onap.universalvesadapter.exception;
*/
public class ConfigFileSmooksConversionException extends VesException {
- /**
- *
- */
private static final long serialVersionUID = 7128340575013771888L;
public ConfigFileSmooksConversionException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
index 7a35f83..3b30fdd 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
@@ -19,17 +19,18 @@
*/
package org.onap.universalvesadapter.exception;
+import org.slf4j.Logger;
+
+
/**
* Exception generated when dealing with communication to DMaap MR API
*
* @author kmalbari
*
*/
-public class DMaapException extends VesException {
+public class DMaapException extends RuntimeException {
- /**
- *
- */
+
private static final long serialVersionUID = 7045766597511192878L;
public DMaapException(String string) {
@@ -39,6 +40,26 @@ public class DMaapException extends VesException {
public DMaapException(String string, Exception exception) {
super(string, exception);
}
+
+ /**
+ * @param message - Error Message for Exception
+ * @param cause - Actual Exception which caused {@link DMaapException}
+ */
+ public DMaapException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Creates and logs the DCAE Runtime Exception to given logger
+ *
+ * @param message - Error Message for Exception and logging
+ * @param logger - Logger used for logging exception
+ * @param cause - Actual exception which caused {@link DMaapException}
+ */
+ public DMaapException(String message, Logger logger, Throwable cause) {
+ super(message, cause);
+ logger.error(message);
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
index 3dfa034..b4ebcc5 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
@@ -27,9 +27,6 @@ package org.onap.universalvesadapter.exception;
*/
public class MapperConfigException extends VesException {
- /**
- *
- */
private static final long serialVersionUID = -7876042513908918292L;
public MapperConfigException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
index fd11b89..12c74cb 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
@@ -28,10 +28,6 @@ package org.onap.universalvesadapter.exception;
*/
public class VesException extends Exception {
-
- /**
- *
- */
private static final long serialVersionUID = -8549819066568432382L;
public VesException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
index e34b98a..06d8249 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
@@ -130,24 +130,7 @@ public class Evaluation {
public String toString() {
return new ToStringBuilder(this).append("operand", operand).append("field", field).append("value", value).append("datatype", datatype).append("lhs", lhs).append("rhs", rhs).append("additionalProperties", additionalProperties).toString();
}
- /*
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(field).append(additionalProperties).append(value).append(rhs).append(datatype).append(operand).append(lhs).toHashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if ((other instanceof Evaluation) == false) {
- return false;
- }
- Evaluation rhs = ((Evaluation) other);
- return new EqualsBuilder().append(field, rhs.field).append(additionalProperties, rhs.additionalProperties).append(value, rhs.value).append(rhs, rhs.rhs).append(datatype, rhs.datatype).append(operand, rhs.operand).append(lhs, rhs.lhs).isEquals();
- }
-*/
+
@Override
public int hashCode() {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
index 1e6006a..43766ef 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
@@ -35,11 +35,6 @@ import org.springframework.stereotype.Component;
@Component
public class AdapterService {
- /*
- * @Autowired private UniversalEventAdapter snmpTrapEventAdapter; public
- * GenericAdapter identifyIncomingJsonFormatAndReturnAdapter() { return
- * snmpTrapEventAdapter; }
- */
/**
* Identifies eventype by parsing the incoming json file.
@@ -52,7 +47,6 @@ public class AdapterService {
*/
public String identifyEventTypeFromIncomingJson(String incomingJsonString) throws MapperConfigException {
- // TODO A proper logic to identify diffeent events is needed here
return MapperConfigUtils.checkIncomingJsonForMatchingDomain(incomingJsonString);
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java
deleted file mode 100644
index bf45a1b..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-
-/**
- * A contract defined for services that will handle the operations of config file.
- *
- * @author kmalbari
- *
- */
-public interface ConfigFileService {
-
- /**
- * Returns the config file data.
- *
- * @param fileName file name
- * @return config file content
- * @throws ConfigFileReadException if unable to read config file
- */
- String readConfigFile(String fileName) throws ConfigFileReadException;
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
index e463a28..04f333e 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
@@ -19,226 +19,96 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration;
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.exception.VesException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- *
- * This service will handle all the communication with the DMaap MR API
- *
- *
- * @author kmalbari
- *
- */
@Component
public class DMaapService {
- private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private DMaapMrUrlConfiguration dmaapMrUrlObject;
-
- private MRConsumer consumer;
+ private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
+ private static List<String> list = new LinkedList<String>();
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+
+ /**
+ * It fetches events from DMaap in JSON, transforms JSON to VES format and
+ * publishes it to outgoing DMaap MR Topic
+ *
+ * @param DMaaPMRSubscriber,DMaaPMRPublisher
+ * @return
+ */
+ public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater)
+ throws InterruptedException {
+ LOGGER.info("fetch and publish from and to Dmaap started");
+
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) {
+ list.add(incomingJsonString);
+
+ }
+
+ if (list.size() == 0) {
+ Thread.sleep(2000);
+ }
+ LOGGER.debug("number of messages to be converted :{}", list.size());
+
+ if (list.size() != 0) {
+ String val = ((LinkedList<String>) list).removeFirst();
+ List<String> messages = new ArrayList<>();
+ String vesEvent = processReceivedJson(val);
+ if (!(vesEvent.isEmpty() || vesEvent.equals(null) || vesEvent.equals(""))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+ LOGGER.info("Message successfully published to DMaaP Topic");
+ }
+
+ }
+
+ }
+ }
+ }
+
+ /**
+ * It finds mapping file for received json, transforms json to VES format
+ *
+ * @param incomingJsonString
+ * @return
+ */
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+ /*
+ * For Future events String eventType =
+ * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
+ *
+ * LOGGER.debug("Event identified as " + eventType);
+ */
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp");
+
+ } catch (VesException exception) {
+ LOGGER.error("Received exception : " + exception.getMessage(), exception);
+ LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ LOGGER.error("Received exception : ", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
- private MRBatchingPublisher publisher;
-
- private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>();
-
- /**
- * Adds message to outgoing queue that will be sent to DMaaP topic.
- *
- * @param message outbound message in VES format
- */
- public void addMessageInOutgoingQueue(String message) {
- if (null != message && !"".equals(message)) {
- outgoingMessageQueue.add(message);
- eLOGGER.debug("Added message to outgoing queue " + message);
- }
- }
-
-
-
- /**
- * reads the messages on DMaap MR Topic
- *
- * @return iterable of messages that will be received on DMaap MR Topic
- *
- * @throws DMaapException
- */
- public Iterable<String> consumeFromDMaap() throws DMaapException{
- if(null == consumer){
- try {
- consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties());
- eLOGGER.debug("Created consumer");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception);
- }
- }
-
- try {
- eLOGGER.debug("Returning result fetched by consumer");
- return consumer.fetch();
- } catch (Exception exception) {
- throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception);
- }
-// return () -> Collections.emptyIterator();
-
- }
-
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap() throws DMaapException{
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Create a publisher now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- for(String message : outgoingMessageQueue){
- try {
- publisher.send(message);
- eLOGGER.debug("Sending message to DMaaP :-> " + message );
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- }
- else
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
-
- }
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap(String outgoingMessage) throws DMaapException{
- if(null == publisher){
- synchronized(publisher){
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Publisher created now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- }
- }
- try {
- publisher.send(outgoingMessage);
- eLOGGER.debug("Sent outgoing message " + outgoingMessage);
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- } else{
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
- }
-
- }
-
-
- /**
- * for local testing only
- * @return
- * @throws DMaapException
- */
- /*public String consume() throws DMaapException {
-
- URL url;
- StringBuffer incomingJson = null;
- incomingJson = new StringBuffer();
- try {
- url = new URL(dmaapMrUrlObject.getUrl());
-
- //open the connection to the above URL.
- URLConnection urlcon = url.openConnection();
-
- Map<String, List<String>> header = urlcon.getHeaderFields();
-
- //print all the fields along with their value.
- for (Map.Entry<String, List<String>> mp : header.entrySet()) {
- eLOGGER.debug(mp.getKey() + " : ");
- eLOGGER.debug(mp.getValue().toString());
- }
- eLOGGER.debug("Complete source code of the URL is-");
- eLOGGER.debug("---------------------------------");
-
- //get the inputstream of the open connection.
- BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream()));
- String tempString;
- //print the source code line by line.
- while ((tempString = br.readLine()) != null) {
- eLOGGER.debug(tempString);
- incomingJson.append(tempString);
- }
-
- } catch (MalformedURLException exception) {
- throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception);
- } catch (IOException exception) {
- throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception);
- }
- return incomingJson.toString();
- }*/
-
-
-
-
-
-
-
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java
deleted file mode 100644
index 7c05ced..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.onap.universalvesadapter.configs.DiskRepoConfiguration;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * Implementation of {@code ConfigFileService} using disk repository.
- *
- * @author kmalbari
- *
- */
-@Component
-public class DiskRepoConfigFileService implements ConfigFileService {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private DiskRepoConfiguration diskRepoConfiguration;
-
- private RestTemplate restTemplate;
-
- private URI uri = null;
-
- /* (non-Javadoc)
- * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String)
- */
- @Override
- public String readConfigFile(String fileName) throws ConfigFileReadException {
- logger.debug("Reading config file for " + fileName);
- if (null == uri) {
- try {
- uri = new URI(diskRepoConfiguration.getFileRepositoryUrl() + fileName);
- logger.debug("Read URI for " + fileName);
- } catch (URISyntaxException exception) {
- throw new ConfigFileReadException("Unable to read config file for file "
- + fileName + "\n Reason : " + exception.getMessage(), exception);
- }
- }
- logger.debug("Calling file repo service for URI" + uri);
- ResponseEntity<String> fileDataEntity = getRestTemplate().getForEntity(uri, String.class);
- logger.debug("Call completed successfully");
- return fileDataEntity.getBody();
- }
-
- /**
- * Instantiates the instance if null and returns it.
- *
- * @return {@code RestTemplate} instance
- */
- private RestTemplate getRestTemplate(){
-
- if (null == restTemplate) {
- restTemplate = new RestTemplate();
- }
-
- return restTemplate;
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java
deleted file mode 100644
index 77769f5..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import org.onap.universalvesadapter.domain.ConfigFileData;
-import org.springframework.stereotype.Component;
-
-/**
- * Service to use mongo db as config file repository
- *
- * @author kmalbari
- *
- */
-@Component
-public class MongoDbConfigFileService implements ConfigFileService {
-
- /* (non-Javadoc)
- * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String)
- */
- public String readConfigFile(String configFileName){
- //HERE CONFIG FILE DATA WOULD COME FROM MONGO DB
- ConfigFileData configFileData = new ConfigFileData();
- configFileData.setXmlFileName(configFileName);
- configFileData.setXmlContent("<?xml version=\"1.0\" encoding=\"UTF-8\"?> "
- + "<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" "
- + "xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" "
- + " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> "
- + " <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> "
- + " </json:reader> "
- + "</smooks-resource-list>");
- return configFileData.getXmlContent();
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
new file mode 100644
index 0000000..f92511e
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.service;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Component;
+
+//AdapterInitializer
+@Component
+public class VESAdapterInitializer implements CommandLineRunner, Ordered {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VESAdapterInitializer.class);
+ @Value("${spring.datasource.url}")
+ String dBurl;
+ @Value("${spring.datasource.username}")
+ String user;
+ @Value("${spring.datasource.password}")
+ String pwd;
+
+ private static Map<String, String> mappingFiles = new HashMap<String, String>();
+ private static Map<String, String> env;
+ private static String url;
+ public static String retString;
+ public static String retCBSString;
+
+ @Override
+ public void run(String... args) throws Exception {
+
+ fetchMappingFile();
+ getconsul();
+
+ }
+
+ private void getconsul() {
+
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ LOGGER.info(entry.getKey() + ":" + entry.getValue());
+ }
+
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+ LOGGER.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ retString = executecurl(url);
+
+ } else {
+
+
+
+ LOGGER.info(">>>Static configuration to be used");
+
+
+ }
+
+ }
+
+ private String executecurl(String url) {
+
+ String[] command = { "curl", "-v", url };
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ LOGGER.info(result);
+
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ LOGGER.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
+
+ }
+
+ public void fetchMappingFile() {
+ try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {
+ LOGGER.info("Retrieving data from DB");
+ PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file");
+ ResultSet rs = pstmt.executeQuery();
+ // parsing the column each time is a linear search
+ int column1Pos = rs.findColumn("enterpriseid");
+ int column2Pos = rs.findColumn("mappingfilecontents");
+ String hexString;
+ while (rs.next()) {
+ String column1 = rs.getString(column1Pos);
+ String column2 = rs.getString(column2Pos);
+ hexString = column2.substring(2);
+ byte[] bytes = Hex.decodeHex(hexString.toCharArray());
+ String data = new String(bytes, "UTF-8");
+ mappingFiles.put(column1, data);
+ }
+ LOGGER.info("DB Initialization Completed..." + mappingFiles.size());
+ } catch (Exception e) {
+ LOGGER.error("Error occured due to :" + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ public static Map<String, String> getMappingFiles() {
+ return mappingFiles;
+ }
+
+ public static void setMappingFiles(Map<String, String> mappingFiles) {
+ VESAdapterInitializer.mappingFiles = mappingFiles;
+ }
+
+ @Override
+ public int getOrder() {
+ return 1;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 112d1d6..ed590a8 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,28 +19,14 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-
-import javax.annotation.Resource;
-
-import org.milyn.io.FileUtils;
-import org.onap.universalvesadapter.adapter.GenericAdapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
-import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-import org.onap.universalvesadapter.utils.ParallelTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
-import org.springframework.util.FileCopyUtils;
/**
* Service that starts the universal ves adapter module to listen for events
@@ -50,128 +36,51 @@ import org.springframework.util.FileCopyUtils;
*/
@Component
public class VesService {
-
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- private boolean isRunning = true;
-
- @Autowired
- private ConfigurableApplicationContext ctx;
-
- @Autowired
- private DMaapService dmaapService;
-
- @Autowired
- private AdapterService adapterService;
-
- @Resource(name = "universalEventAdapter")
- private GenericAdapter eventAdapter;
- @Value("${messagesInBatch}")
- private int messagesInBatch;
+ private final Logger LOGGER = LoggerFactory.getLogger(VesService.class);
+
+ private boolean isRunning = true;
+
+ @Autowired
+ private DMaapService dmaapService;
+
+ @Autowired
+ private Creator creator;
+
- @Value("${messagesInTimeInterval}")
- private long messagesInTimeInverval;
-
- @Value("${mapperConfig.file}")
- private String mapperConfigFile;
-
- /*public void start(){
-
- String incomingJsonString = dmaapService.consume();
- if(!"".equals(incomingJsonString)){
- GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter();
- String outgoingJsonString = eventAdapter.transform(incomingJsonString);
- System.out.println(outgoingJsonString);
- }
- }*/
-
-
- /**
- * method triggers universal ves adapter module.
- */
- public void start() {
-
- try {
- String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile));
- MapperConfigUtils.readMapperConfigFile(mappingConfigFileData);
-
-
- ParallelTasks parallelTasks = new ParallelTasks();
- while (isRunning) {
- int processingNumberOfMessage = 0;
- long start = System.currentTimeMillis();
- for (String incomingJsonString : dmaapService.consumeFromDMaap()) {
- parallelTasks.add(() -> processReceivedJson(incomingJsonString));
- processingNumberOfMessage++;
- if (processingNumberOfMessage == messagesInBatch
- || (System.currentTimeMillis() - start) > messagesInTimeInverval) {
- processingNumberOfMessage = 0;
- start = System.currentTimeMillis();
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
- }
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
-
- /*String incomingJsonString = "";
- incomingJsonString = dmaapService.consume();
- processReceivedJson(incomingJsonString);*/
- } catch (Exception exception) {
- LOGGER.error("Reported exception : " + exception.getMessage(), exception);
- }
- }
+ /**
+ * method triggers universal VES adapter module.
+ */
+ public void start() throws MapperConfigException {
+ LOGGER.debug("Creating Subcriber and Publisher with creator.............");
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber();
- /**
- * It finds mapping file for received json, transforms json to VES format
- * and publishes it to outgoing DMaap MR Topic
- *
- * @param incomingJsonString
- */
- private void processReceivedJson(String incomingJsonString){
- LOGGER.debug("Received incoming message : " + incomingJsonString);
- if (!"".equals(incomingJsonString)) {
- String eventType;
- try {
- eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
-
- LOGGER.debug("Event identified as " + eventType);
- String outgoingJsonString;
- outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType);
- LOGGER.debug("Output VES json to be sent " + outgoingJsonString);
-
-// dmaapService.addMessageInOutgoingQueue(outgoingJsonString);
-// LOGGER.debug("Added message in outgoing Queue ");
-
- dmaapService.publishToDMaap(outgoingJsonString);
- LOGGER.debug("Sent message in outgoing Queue ");
-
-
- } catch (VesException exception) {
- LOGGER.error("Received exception : " + exception.getMessage(), exception);
-
- //TODO KKM : Do we wish to continue the application with same exception in every thread??
- LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
- ctx.close();
- }
- }
- }
-
- /**
- * method stops universal ves adapter module
- */
- public void stop() {
-
- isRunning = false;
- }
+ DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
+
+ // Create subscriber & publisher thread
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOGGER.debug("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
+ dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Start subscriber & publisher thread
+ t1.setName("SNMP-COLLECTOR");
+ t1.start();
+
+ }
+
+ /**
+ * method stops universal ves adapter module
+ */
+ public void stop() {
+ isRunning = false;
+ }
}
+
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
new file mode 100644
index 0000000..34bbc8e
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
@@ -0,0 +1,288 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+package org.onap.universalvesadapter.utils;
+
+import javax.validation.constraints.NotEmpty;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.stereotype.Component;
+
+@Component
+@PropertySource(value = {"classpath:application.properties","classpath:DMaapMR.properties"})
+@ConfigurationProperties
+public class DmaapConfig {
+
+ @Value("${mr.hostname}")
+ @NotEmpty
+ private String hostname;
+
+ // default port number
+ @Value("${mr.DEFAULT_PORT_NUMBER}")
+ @NotEmpty
+ private int DEFAULT_PORT_NUMBER;
+
+ // default to no username
+ @Value("${mr.DEFAULT_USER_NAME}")
+ private String DEFAULT_USER_NAME;
+
+
+ //defaults to no userPassword
+ @Value("${mr.DEFAULT_USER_PASSWORD}")
+ private String DEFAULT_USER_PASSWORD;
+
+ //defaults to using https protocol
+ @Value("${mr.DEFAULT_PROTOCOL}")
+ @NotEmpty
+ private String DEFAULT_PROTOCOL;
+
+ //defaults to json content type
+ @Value("${mr.DEFAULT_CONTENT_TYPE}")
+ @NotEmpty
+ private String DEFAULT_CONTENT_TYPE;
+
+ @Value("${mr.DMAAP_URI_PATH_PREFIX}")
+ @NotEmpty
+ private String DMAAP_URI_PATH_PREFIX;
+
+ @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}")
+ @NotEmpty
+ private String DMAAP_DEFAULT_CONSUMER_ID;
+
+ @Value("${mr.DMAAP_GROUP_PREFIX}")
+ @NotEmpty
+ private String DMAAP_GROUP_PREFIX;
+
+ // Publisher Constants
+
+ //Dmaap Publisher Topic
+ @Value("${mr.publisher.topic}")
+ @NotEmpty
+ private String publisherTopic;
+
+ //disable batching by default
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+
+ //default recovery messages size
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+
+//number of retries when flushing messages
+ @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+
+ //delay in retrying for flushing messages
+ @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+
+ // Subscriber Constants
+
+ //Dmaap Subcriber Topic
+ @Value("${mr.subscriber.topic}")
+ @NotEmpty
+ private String subscriberTopic;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
+ @NotEmpty
+ private int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
+ @NotEmpty
+ private int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
+ @NotEmpty
+ private String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+
+ @Value("${mr.subcriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+
+ @Value("${mr.subcriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getDEFAULT_PORT_NUMBER() {
+ return DEFAULT_PORT_NUMBER;
+ }
+
+ public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) {
+ DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER;
+ }
+
+ public String getDEFAULT_USER_NAME() {
+ return DEFAULT_USER_NAME;
+ }
+
+ public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) {
+ DEFAULT_USER_NAME = dEFAULT_USER_NAME;
+ }
+
+ public String getDEFAULT_USER_PASSWORD() {
+ return DEFAULT_USER_PASSWORD;
+ }
+
+ public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) {
+ DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD;
+ }
+
+ public String getDEFAULT_PROTOCOL() {
+ return DEFAULT_PROTOCOL;
+ }
+
+ public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) {
+ DEFAULT_PROTOCOL = dEFAULT_PROTOCOL;
+ }
+
+ public String getDEFAULT_CONTENT_TYPE() {
+ return DEFAULT_CONTENT_TYPE;
+ }
+
+ public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) {
+ DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE;
+ }
+
+ public String getDMAAP_URI_PATH_PREFIX() {
+ return DMAAP_URI_PATH_PREFIX;
+ }
+
+ public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) {
+ DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX;
+ }
+
+ public String getDMAAP_DEFAULT_CONSUMER_ID() {
+ return DMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) {
+ DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public String getDMAAP_GROUP_PREFIX() {
+ return DMAAP_GROUP_PREFIX;
+ }
+
+ public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) {
+ DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX;
+ }
+
+ public String getPublisherTopic() {
+ return publisherTopic;
+ }
+
+ public void setPublisherTopic(String publisherTopic) {
+ this.publisherTopic = publisherTopic;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(
+ int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public String getSubscriberTopic() {
+ return subscriberTopic;
+ }
+
+ public void setSubscriberTopic(String subscriberTopic) {
+ this.subscriberTopic = subscriberTopic;
+ }
+
+ public int getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
+ return subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
+ this.subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public int getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
+ return subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
+ this.subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public String getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
+ return subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
+ this.subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public String getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
+ return subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public void setSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
+ this.subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public String getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
+ return subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+ public void setSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
+ this.subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java
new file mode 100644
index 0000000..02632f4
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java
@@ -0,0 +1,62 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.utils;
+
+/**
+ * Contains common utils to check HTTP Related Utils
+ *
+ * @author Rajiv Singla . Creation Date: 11/2/2016.
+ */
+public abstract class HTTPUtils {
+
+ /**
+ * HTTP Status code for successful HTTP call
+ */
+ public static final Integer HTTP_SUCCESS_STATUS_CODE = 200;
+
+ /**
+ * HTTP Response code when request has been accepted for processing, but the processing has not been completed
+ */
+ public static final Integer HTTP_ACCEPTED_RESPONSE_CODE = 202;
+
+ /**
+ * HTTP Response code when there is no content
+ */
+ public static final Integer HTTP_NO_CONTENT_RESPONSE_CODE = 204;
+
+
+ public static final String JSON_APPLICATION_TYPE = "application/json";
+
+
+ private HTTPUtils() {
+
+ }
+
+ /**
+ * Checks if HTTP Status code is less than or equal to 200 but less then 300
+ *
+ * @param statusCode http status code
+ * @return true if response code between 200 and 300
+ */
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
index 4e67ed6..3d1907a 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
@@ -19,18 +19,22 @@
*/
package org.onap.universalvesadapter.utils;
-import com.att.aft.dme2.internal.apache.commons.lang3.EnumUtils;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
+
import org.onap.universalvesadapter.exception.MapperConfigException;
import org.onap.universalvesadapter.mappingconfig.Entry;
import org.onap.universalvesadapter.mappingconfig.Evaluation;
import org.onap.universalvesadapter.mappingconfig.MapperConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
/**
+ * This class will be used in Future for different telemetry data:
* This class will be utility class to read the mapper config file and parse the
* config to prepare the grammar to detect the incoming json's event type.
*
@@ -39,6 +43,7 @@ import org.onap.universalvesadapter.mappingconfig.MapperConfig;
*/
public class MapperConfigUtils {
+ private final Logger LOGGER = LoggerFactory.getLogger(MapperConfigUtils.class);
private static Set<Entry> entries = new TreeSet<>((o1, o2) -> o1.getPriority().compareTo(o2.getPriority()));
private enum JoinOperator {
@@ -212,6 +217,7 @@ public class MapperConfigUtils {
exception);
}
System.out.println("Read config file content into :" + config);
+
if (null != config) {
entries.addAll(config.getEntries());
} else {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java
deleted file mode 100644
index 45fdf96..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- *
- * Utility class to execute parallel tasks
- *
- * @author kmalbari
- *
- */
-public class ParallelTasks
-{
- private final Collection<Runnable> tasks = new ArrayList<Runnable>();
-
- public ParallelTasks()
- {
- }
-
- /**
- *
- * Add task to be executed in parallel
- *
- * @param task
- */
- public void add(final Runnable task)
- {
- tasks.add(task);
- }
-
- /**
- * starts all the added tasks in parallel
- *
- * @throws InterruptedException
- */
- public void startParallelTasks() throws InterruptedException
- {
- final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
- .availableProcessors());
- try
- {
- final CountDownLatch latch = new CountDownLatch(tasks.size());
- for (final Runnable task : tasks)
- threads.execute(new Runnable() {
- public void run()
- {
- try
- {
- task.run();
- }
- finally
- {
- latch.countDown();
- }
- }
- });
- latch.await();
- }
- finally
- {
- threads.shutdown();
- }
- }
-} \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
index 8c17dc2..1c38783 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
* @author kmalbari
*
*/
+
+
public class SmooksUtils {
@@ -53,23 +55,19 @@ public class SmooksUtils {
*/
public static VesEvent getTransformedObjectForInput(Smooks smooks, String incomingJsonString) {
- LOGGER.debug("Transforming json " + incomingJsonString);
+ LOGGER.info("Transforming incoming json " );
ExecutionContext executionContext = smooks.createExecutionContext();
- LOGGER.debug("Context created");
+ LOGGER.info("Context created");
Locale defaultLocale = Locale.getDefault();
Locale.setDefault(new Locale("en", "IE"));
StringResult result = new StringResult();
- // Configure the execution context to generate a report...
-// executionContext.setEventListener(new HtmlReportGenerator("target/report/report.html"));
-
- // Filter the input message to the outputWriter, using the execution context...
smooks.filterSource(executionContext, new StreamSource(new ByteArrayInputStream(incomingJsonString.getBytes(StandardCharsets.UTF_8))), result);
- LOGGER.debug("Transformed incoming json now");
+
Locale.setDefault(defaultLocale);
VesEvent vesEvent = (VesEvent) executionContext.getBeanContext().getBean("vesEvent");
- LOGGER.debug("Converted vesEvent from incoming json");
+ LOGGER.debug("consversion successful to VES Event");
return vesEvent;
}
diff --git a/UniversalVesAdapter/src/main/resources/DMaapMR.properties b/UniversalVesAdapter/src/main/resources/DMaapMR.properties
new file mode 100644
index 0000000..ae96248
--- /dev/null
+++ b/UniversalVesAdapter/src/main/resources/DMaapMR.properties
@@ -0,0 +1,45 @@
+ # DMaaP Config Constants
+ #
+ #default hostname
+
+ mr.hostname=10.53.172.156
+ # default port number
+ mr.DEFAULT_PORT_NUMBER=3904
+ # default to no username
+ mr.DEFAULT_USER_NAME=null
+ # defaults to no userPassword
+ mr.DEFAULT_USER_PASSWORD=null
+ #defaults to using https protocol
+ mr.DEFAULT_PROTOCOL=http
+ #defaults to json content type
+ mr.DEFAULT_CONTENT_TYPE=application/json
+
+ mr.DMAAP_URI_PATH_PREFIX=/events/
+ mr.DMAAP_DEFAULT_CONSUMER_ID=con2
+ mr.DMAAP_GROUP_PREFIX=grp2
+
+ # // ================== DMaaP MR Constants ============================== //
+
+ # ///////////////// Publisher Constants
+ #Dmaap Publisher Topic
+ mr.publisher.topic=unauthenticated.SEC_FAULT_OUTPUT
+ #mr.publisher.topic=TEST-TOPIC1
+ #mr.publisher.topic=unauthenticated.SEC_MEASUREMENT_OUTPUT
+ #disable batching by default
+ mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE=1
+ # default recovery messages size
+ mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE=100000
+ #number of retries when flushing messages
+ mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE=5
+ #delay in retrying for flushing messages
+ mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE=5000
+ #////////////////// Subscriber Constants
+ #Dmaap Subcriber Topic
+ #mr.subscriber.topic=ONAP-COLLECTOR-SNMPTRAP
+ mr.subscriber.topic=ONAP-COLLECTOR-SNMPTRAP
+ #mr.subscriber.topic=TEST-TOPIC2
+ mr.subcriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS=-1
+ mr.subcriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT=-1
+ mr.subcriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX=grp2
+ mr.subcriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME=timeout
+ mr.subcriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME=limit \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/resources/application.properties b/UniversalVesAdapter/src/main/resources/application.properties
index 4841f4a..c2dec51 100644
--- a/UniversalVesAdapter/src/main/resources/application.properties
+++ b/UniversalVesAdapter/src/main/resources/application.properties
@@ -1,11 +1,18 @@
+server.port=8085
+
logging.level.org.springframework.web=ERROR
-logging.level.org.onap.universalvesadapter=DEBUG
-dmaap.url=http://localhost:8080/greeting12
-snmpTrap.configFile=snmpTrapToVes.xml
+defaultMappingFileName=defaultSnmpMappingFile
universal.configFiles=snmp:snmpTrapToVes.xml,default:defaultConfig.xml
-fileService.url=http://localhost:8888/fileAsString/
-messagesInBatch=1000
-messagesInTimeInterval=5000
+
mapperConfig.file=../UniversalVesAdapter/src/main/resources/MapperConfig.json
-dmaap.consumer_props=../UniversalVesAdapter/src/main/resources/dme2/consumer.properties
-dmaap.publisher_props=../UniversalVesAdapter/src/main/resources/dme2/publisher.properties \ No newline at end of file
+dmaap.mr_props=DMaapMR.properties
+
+#DEV Machine DB Details
+spring.datasource.url=jdbc:postgresql://10.49.16.19:5432/dummy
+spring.datasource.username=postgres
+spring.datasource.password=root
+
+#Lab Details
+#spring.datasource.url=jdbc:postgresql://10.53.172.129:5432/dummy
+#spring.datasource.username=ngpuser
+#spring.datasource.password=root \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/resources/dme2/consumer.properties b/UniversalVesAdapter/src/main/resources/dme2/consumer.properties
deleted file mode 100644
index 6ba59d1..0000000
--- a/UniversalVesAdapter/src/main/resources/dme2/consumer.properties
+++ /dev/null
@@ -1,61 +0,0 @@
-<!DOCTYPE HTML>
-<!DOCTYPE html PUBLIC "" ""><HTML><HEAD>
-<META http-equiv="Content-Type" content="text/html; charset=utf-8"></HEAD>
-<BODY>
-<PRE>###############################################################################
-# ============LICENSE_START=======================================================
-# org.onap.dmaap
-# ================================================================================
-# Copyright � 2017 AT&amp;T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&amp;T Intellectual Property.
-#
-###############################################################################
-TransportType=DME2
-Latitude =47.778998
-Longitude =-122.182883
-Version =1.0
-ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
-Environment =TEST
-Partner=BOT_R
-routeOffer=MR1
-SubContextPath =/
-Protocol =http
-MethodType =GET
-username =&lt;att uid&gt;
-password =&lt;password&gt;
-contenttype =application/json
-authKey=&lt;auth key&gt;
-authDate=2016-02-18T13:57:37-0800
-#host=uebsb91bodc.it.att.com:3904
-host=&lt;host&gt;:&lt;port&gt;
-topic=com.att.ecomp_test.crm.preDemo1
-group=con
-id=5
-timeout=15000
-limit=1000
-filter=
-AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
-AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
-AFT_DME2_REQ_TRACE_ON=true
-AFT_ENVIRONMENT=AFTUAT
-AFT_DME2_EP_CONN_TIMEOUT=15000
-AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
-AFT_DME2_EP_READ_TIMEOUT_MS=50000
-sessionstickinessrequired=NO
-DME2preferredRouterFilePath=/src/main/resources/dme2/preferredRoute.txt
-
-
-</PRE></BODY></HTML>
diff --git a/UniversalVesAdapter/src/main/resources/dme2/preferredRoute.properties b/UniversalVesAdapter/src/main/resources/dme2/preferredRoute.properties
deleted file mode 100644
index 506df76..0000000
--- a/UniversalVesAdapter/src/main/resources/dme2/preferredRoute.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-<!DOCTYPE HTML>
-<!DOCTYPE html PUBLIC "" ""><HTML><HEAD>
-<META http-equiv="Content-Type" content="text/html; charset=utf-8"></HEAD>
-<BODY>
-<PRE>preferredRouteKey=MR1</PRE></BODY></HTML>
diff --git a/UniversalVesAdapter/src/main/resources/dme2/producer.properties b/UniversalVesAdapter/src/main/resources/dme2/producer.properties
deleted file mode 100644
index fb9e639..0000000
--- a/UniversalVesAdapter/src/main/resources/dme2/producer.properties
+++ /dev/null
@@ -1,59 +0,0 @@
-<!DOCTYPE HTML>
-<!DOCTYPE html PUBLIC "" ""><HTML><HEAD>
-<META http-equiv="Content-Type" content="text/html; charset=utf-8"></HEAD>
-<BODY>
-<PRE>###############################################################################
-# ============LICENSE_START=======================================================
-# org.onap.dmaap
-# ================================================================================
-# Copyright � 2017 AT&amp;T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&amp;T Intellectual Property.
-#
-###############################################################################
-TransportType=DME2
-Latitude =47.778998
-Longitude =-122.182883
-Version =1.0
-ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
-#com.att.acsi.saat.dt.dmaap.dev.mrclientnew1
-Environment =TEST
-Partner=BOT_R
-routeOffer=MR1
-SubContextPath =/
-Protocol =http
-MethodType =POST
-username =&lt;att uid&gt;
-password =&lt;global logon password&gt;
-contenttype = application/json
-authKey=&lt;auth key&gt;
-authDate=2016-07-20T11:30:56-0700
-host=&lt;host&gt;:&lt;port&gt;
-topic=com.att.ecomp_test.crm.preDemo1
-#host=uebsb91bodc.it.att.com:3904
-partition=1
-maxBatchSize=100
-maxAgeMs=250
-AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
-AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
-AFT_DME2_REQ_TRACE_ON=true
-AFT_ENVIRONMENT=AFTUAT
-AFT_DME2_EP_CONN_TIMEOUT=15000
-AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
-AFT_DME2_EP_READ_TIMEOUT_MS=50000
-sessionstickinessrequired=NO
-DME2preferredRouterFilePath=/src/main/resources/dme2/preferredRoute.txt
-MessageSentThreadOccurance=50
-</PRE></BODY></HTML>
diff --git a/UniversalVesAdapter/src/main/resources/logback.xml b/UniversalVesAdapter/src/main/resources/logback.xml
new file mode 100644
index 0000000..c08cd77
--- /dev/null
+++ b/UniversalVesAdapter/src/main/resources/logback.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <property name="DEV_HOME" value="logs" />
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <layout class="ch.qos.logback.classic.PatternLayout">
+ <Pattern>
+ %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
+ </Pattern>
+ </layout>
+ </appender>
+
+ <appender name="FILE-AUDIT"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${DEV_HOME}/debug.log</file>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <Pattern>
+ %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
+ </Pattern>
+ </encoder>
+
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <!-- rollover daily -->
+ <fileNamePattern>${DEV_HOME}/archived/debug.%d{yyyy-MM-dd}.%i.log
+ </fileNamePattern>
+ <timeBasedFileNamingAndTriggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>10MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+
+ </appender>
+
+ <appender name="FILE-ERROR"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>ERROR</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <file>${DEV_HOME}/DroppedEvents.log</file>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <Pattern>
+ %d{yyyy-MM-dd HH:mm:ss} %logger{36} - %msg%n
+ </Pattern>
+ </encoder>
+
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <!-- rollover daily -->
+ <fileNamePattern>${DEV_HOME}/archived/DroppedEvents.%d{yyyy-MM-dd}.%i.log
+ </fileNamePattern>
+ <timeBasedFileNamingAndTriggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>10MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+
+ </appender>
+
+ <!-- Send logs to both console and file audit -->
+ <logger name="org.onap.universalvesadapter" level="debug" additivity="false">
+ <appender-ref ref="FILE-AUDIT" />
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="org.onap.universalvesadapter.adapter" level="debug" additivity="false">
+ <appender-ref ref="FILE-AUDIT" />
+ <appender-ref ref="FILE-ERROR" />
+ <appender-ref ref="STDOUT" />
+ </logger> -
+ <!-- <logger name="org.onap.universalvesadapter.adapter" level="error" additivity="false">
+ <appender-ref ref="FILE-ERROR" />
+ </logger> -->
+
+
+
+
+
+</configuration> \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/resources/snmptovesTest.xml b/UniversalVesAdapter/src/main/resources/snmptovesTest.xml
deleted file mode 100644
index 5fad2c9..0000000
--- a/UniversalVesAdapter/src/main/resources/snmptovesTest.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<smooks-resource-list
- xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
- xmlns:json="http://www.milyn.org/xsd/smooks/json-1.1.xsd"
- xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.2.xsd">
- <json:reader rootName="simple" keyWhitspaceReplacement="-">
- </json:reader>
- <!-- <jb:bean class="com.example.demo.Simple" beanId="simple" createOnElement="simple">
- <jb:value property="orderId" data="#/orderId" />
- <jb:value property="username" data="#/username" />
- <jb:wiring property="customer" beanIdRef="customer"/>
- <jb:wiring property="orderItems" beanIdRef="orderItems"/>
- </jb:bean> -->
-
- <jb:bean class="org.onap.dcaegen2.ves.domain.VesEvent" beanId="vesEvent" createOnElement="simple">
- <jb:wiring property="event" beanIdRef="event"/>
- </jb:bean>
-
- <jb:bean class="org.onap.dcaegen2.ves.domain.Event" beanId="event" createOnElement="simple">
- <jb:wiring property="commonEventHeader" beanIdRef="commonEventHeader"/>
- <jb:wiring property="faultFields" beanIdRef="faultFields"/>
- <!--<jb:wiring property="measurementsForVfScalingFields" beanIdRef="measurementsForVfScalingFields"/> -->
- </jb:bean>
- <!--<jb:bean class="org.onap.dcaegen2.ves.domain.MeasurementsForVfScalingFields" beanId="measurementsForVfScalingFields" createOnElement="simple">
- <jb:wiring property="additionalMeasurements" beanIdRef="additionalMeasurements"/>
- </jb:bean>-->
-
- <jb:bean class="org.onap.dcaegen2.ves.domain.CommonEventHeader" beanId="commonEventHeader" createOnElement="simple">
- <jb:value property="eventId" data="#/community" />
- <jb:value property="eventName" data="#/protocol-version" />
- <jb:value property="domain" data="#/trap-category" />
- <jb:value property="sequence" data="#/time-received" decoder="Long"/>
- <jb:value property="lastEpochMicrosec" data="#/community-len" decoder="Double" />
- <jb:value property="startEpochMicrosec" data="#/notify-OID-len" />
- </jb:bean>
-
- <jb:bean class="org.onap.dcaegen2.ves.domain.FaultFields" beanId="faultFields" createOnElement="simple">
- <jb:value property="alarmCondition" data="#/cambria.partition" />
- <jb:value property="eventSeverity" data="#/notify-OID" />
- <jb:value property="eventSourceType" data="#/agent-name" />
- <jb:value property="specificProblem" data="#/agent-address" />
- <jb:value property="faultFieldsVersion" data="#/epoch_serno" decoder="Double" />
- </jb:bean>
-
-
- <!--<jb:bean class="java.util.ArrayList" beanId="additionalMeasurements" createOnElement="simple">
- <jb:wiring beanIdRef="additionalMeasurement"/>
- </jb:bean>
-
- <jb:bean class="org.onap.dcaegen2.ves.domain.AdditionalMeasurement" beanId="additionalMeasurement" createOnElement="varbinds/element">
- <jb:value property="name" data="#/varbind_value" />
- </jb:bean> -->
-
-</smooks-resource-list> \ No newline at end of file
diff --git a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalFieldTest.java b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalFieldTest.java
index ce4c8c7..e082249 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalFieldTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalFieldTest.java
@@ -19,11 +19,6 @@
*/
package org.onap.dcaegen2.ves.domain;
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.onap.dcaegen2.ves.domain.AdditionalField;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalMeasurementTest.java b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalMeasurementTest.java
index 1b9149b..58a2507 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalMeasurementTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalMeasurementTest.java
@@ -21,7 +21,7 @@ package org.onap.dcaegen2.ves.domain;
import java.util.List;
import java.util.Map;
-import org.onap.dcaegen2.ves.domain.AdditionalMeasurement;
+
import org.junit.Test;
public class AdditionalMeasurementTest {
diff --git a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalObjectTest.java b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalObjectTest.java
index fc28bd5..d3750ed 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalObjectTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalObjectTest.java
@@ -21,7 +21,7 @@ package org.onap.dcaegen2.ves.domain;
import java.util.List;
import java.util.Map;
-import org.onap.dcaegen2.ves.domain.AdditionalInformation;
+
import org.junit.Test;
public class AdditionalObjectTest {
diff --git a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalParameterTest.java b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalParameterTest.java
index 92cda09..9b64ddd 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalParameterTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/dcaegen2/ves/domain/AdditionalParameterTest.java
@@ -19,6 +19,8 @@
*/
package org.onap.dcaegen2.ves.domain;
+import static org.junit.Assert.assertNotNull;
+
import java.util.Map;
import org.junit.Test;
@@ -37,6 +39,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.getCriticality();
+
}
@Test
@@ -57,6 +60,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.getName();
+
}
@Test
@@ -77,6 +81,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.getThresholdCrossed();
+
}
@Test
@@ -97,6 +102,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.getValue();
+
}
@Test
@@ -117,6 +123,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.getAdditionalProperties();
+
}
@Test
@@ -138,6 +145,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.toString();
+
}
@Test
@@ -148,6 +156,7 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.hashCode();
+
}
@Test
@@ -159,5 +168,6 @@ public class AdditionalParameterTest {
// default test
testSubject = createTestSubject();
result = testSubject.equals(other);
+
}
} \ No newline at end of file
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/adapter/UniversalEventAdapterTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/adapter/UniversalEventAdapterTest.java
index bc2cf4f..1f80fde 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/adapter/UniversalEventAdapterTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/adapter/UniversalEventAdapterTest.java
@@ -19,48 +19,26 @@
*/
package org.onap.universalvesadapter.adapter;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.Resource;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.milyn.Smooks;
import org.mockito.InjectMocks;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.onap.dcaegen2.ves.domain.VesEvent;
import org.onap.universalvesadapter.Application;
-import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
-import org.onap.universalvesadapter.exception.MapperConfigException;
import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.service.ConfigFileService;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-import org.onap.universalvesadapter.utils.SmooksUtils;
+import org.onap.universalvesadapter.service.VESAdapterInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.util.FileCopyUtils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=Application.class)
@@ -69,10 +47,9 @@ public class UniversalEventAdapterTest {
private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
@Mock
- private ConfigFileService configFileService;
+ private VESAdapterInitializer vESAdapterInitializer;
@InjectMocks
- @Autowired
private UniversalEventAdapter universalVesAdapter;
@@ -86,7 +63,7 @@ public class UniversalEventAdapterTest {
public void testtransform() {
StringBuffer incomingJsonString = new StringBuffer("{ ")
.append("\"protocol version\":\"v2c\", ")
- .append("\"notify OID\":\".1.3.6.1.4.1.74.2.46.12.1.1AAA\", ")
+ .append("\"notify OID\":\".1.3.6.1.4.1.1751.2.46.12.1.1\", ")
.append("\"cambria.partition\":\"dcae-snmp.client.research.att.com\", ")
.append("\"trap category\":\"UCSNMP-HEARTBEAT\", ")
.append("\"epoch_serno\": 15161177410000, ")
@@ -107,48 +84,140 @@ public class UniversalEventAdapterTest {
.append(" }] ")
.append("}");
- StringBuffer configFileData = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?> ")
- .append("<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" ")
- .append("xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" ")
- .append(" xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> ")
- .append(" <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> ")
- .append(" </json:reader> ")
- .append(" <jb:bean class=\"org.onap.dcaegen2.ves.domain.VesEvent\" beanId=\"vesEvent\" ")
- .append(" createOnElement=\"simple\">")
- .append(" <jb:wiring property=\"event\" beanIdRef=\"event\"/>")
- .append(" </jb:bean> ")
- .append(" <jb:bean class=\"org.onap.dcaegen2.ves.domain.Event\" beanId=\"event\" ")
- .append(" createOnElement=\"simple\"> ")
- .append(" <jb:wiring property=\"commonEventHeader\" beanIdRef=\"commonEventHeader\"/>")
- .append(" <jb:wiring property=\"faultFields\" beanIdRef=\"faultFields\"/> ")
- .append(" </jb:bean> ")
- .append(" <jb:bean class=\"org.onap.dcaegen2.ves.domain.CommonEventHeader\" ")
- .append(" beanId=\"commonEventHeader\" createOnElement=\"simple\"> ")
- .append(" <jb:value property=\"eventId\" data=\"#/community\" /> ")
- .append(" <jb:value property=\"eventName\" data=\"#/protocol-version\" /> ")
- .append(" <jb:value property=\"domain\" data=\"#/trap-category\" /> ")
- .append(" <jb:value property=\"sequence\" data=\"#/time-received\" decoder=\"Long\"/> ")
- .append(" <jb:value property=\"lastEpochMicrosec\" data=\"#/community-len\" decoder=\"Double\" /> ")
- .append(" <jb:value property=\"startEpochMicrosec\" data=\"#/notify-OID-len\" /> ")
- .append(" </jb:bean> ")
- .append(" <jb:bean class=\"org.onap.dcaegen2.ves.domain.FaultFields\" beanId=\"faultFields\"")
- .append(" createOnElement=\"simple\"> <jb:value property=\"alarmCondition\" data=\"#/cambria.partition\" /> ")
- .append(" <jb:value property=\"eventSeverity\" data=\"#/notify-OID\" /> ")
- .append(" <jb:value property=\"eventSourceType\" data=\"#/agent-name\" /> ")
- .append(" <jb:value property=\"specificProblem\" data=\"#/agent-address\" /> ")
- .append(" <jb:value property=\"faultFieldsVersion\" data=\"#/epoch_serno\" decoder=\"Double\" /> ")
- .append(" </jb:bean> ")
- .append("</smooks-resource-list>");
-
- try {
- Mockito.when(configFileService.readConfigFile(Mockito.anyString())).thenReturn(configFileData.toString());
- } catch (Exception e) {
- eLOGGER.error("Error occurred : " + e.getMessage());
- }
-
+ Map<String,String> testMap=new HashMap<String,String>();
+ testMap.put("defaultSnmpMappingFile", "<?xml version=\"1.0\"?>\r\n" +
+ "<smooks-resource-list\r\n" +
+ " xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\"\r\n" +
+ " xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\"\r\n" +
+ " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.4.xsd\">\r\n" +
+ " <json:reader rootName=\"vesevent\" keyWhitspaceReplacement=\"-\">\r\n" +
+ " <json:keyMap>\r\n" +
+ " <json:key from=\"date&amp;time\" to=\"date-and-time\" />\r\n" +
+ " </json:keyMap>\r\n" +
+ " </json:reader>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.VesEvent\" beanId=\"vesEvent\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring property=\"event\" beanIdRef=\"event\"/>\r\n" +
+ " </jb:bean>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.Event\" beanId=\"event\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring property=\"commonEventHeader\" beanIdRef=\"commonEventHeader\"/>\r\n" +
+ " <jb:wiring property=\"faultFields\" beanIdRef=\"faultFields\"/> \r\n" +
+ " <!--<jb:wiring property=\"measurementsForVfScalingFields\" beanIdRef=\"measurementsForVfScalingFields\"/> --> \r\n" +
+ " </jb:bean> \r\n" +
+ " <!--<jb:bean class=\"org.onap.dcaegen2.ves.domain.MeasurementsForVfScalingFields\" beanId=\"measurementsForVfScalingFields\" createOnElement=\"simple\">\r\n" +
+ " <jb:wiring property=\"additionalMeasurements\" beanIdRef=\"additionalMeasurements\"/>\r\n" +
+ " </jb:bean>-->\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.CommonEventHeader\" beanId=\"commonEventHeader\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:expression property=\"version\">\"3.0\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventId\">\"XXXX\"</jb:expression>\r\n" +
+ " <jb:expression property=\"reportingEntityName\">\"VesAdapter\"</jb:expression>\r\n" +
+ " <jb:expression property=\"domain\">\"fault\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventName\" execOnElement=\"vesevent\" >commonEventHeader.domain+\"_\"+commonEventHeader.reportingEntityName +\"_\"+ faultFields.alarmCondition;</jb:expression>\r\n" +
+ " <jb:value property=\"sequence\" data=\"0\" default=\"0\" decoder=\"Long\"/>\r\n" +
+ " <jb:value property=\"lastEpochMicrosec\" data=\"#/time-received\" decoder=\"Double\" />\r\n" +
+ " <jb:value property=\"startEpochMicrosec\" data=\"#/time-received\" decoder=\"Double\"/>\r\n" +
+ " <jb:expression property=\"priority\">\"Medium\"</jb:expression>\r\n" +
+ " <jb:expression property=\"sourceName\">\"VesAdapter\"</jb:expression>\r\n" +
+ " </jb:bean> \r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.FaultFields\" beanId=\"faultFields\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:value property=\"alarmCondition\" data=\"#/trap-category\" />\r\n" +
+ " <jb:expression property=\"eventSeverity\">\"MINOR\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventSourceType\">\"SNMP Agent\"</jb:expression>\r\n" +
+ " <jb:expression property=\"specificProblem\">\"SNMP Fault\"</jb:expression>\r\n" +
+ " <jb:value property=\"faultFieldsVersion\" data=\"2.0\" default=\"2.0\" decoder=\"Double\" />\r\n" +
+ " <jb:wiring property=\"alarmAdditionalInformation\" beanIdRef=\"alarmAdditionalInformationroot\"/> \r\n" +
+ " <jb:expression property=\"vfStatus\">\"Active\"</jb:expression>\r\n" +
+ " \r\n" +
+ " </jb:bean> \r\n" +
+ " <jb:bean class=\"java.util.ArrayList\" beanId=\"alarmAdditionalInformationroot\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring beanIdRef=\"alarmAdditionalInformation\"/>\r\n" +
+ " </jb:bean>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.AlarmAdditionalInformation\" beanId=\"alarmAdditionalInformation\" createOnElement=\"varbinds/element\">\r\n" +
+ " <jb:value property=\"name\" data=\"#/varbind_oid\"/>\r\n" +
+ " <jb:value property=\"value\" data=\"#/varbind_value\" />\r\n" +
+ " </jb:bean>\r\n" +
+ " <!--<jb:bean class=\"java.util.ArrayList\" beanId=\"additionalMeasurements\" createOnElement=\"simple\">\r\n" +
+ " <jb:wiring beanIdRef=\"additionalMeasurement\"/>\r\n" +
+ " </jb:bean> \r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.AdditionalMeasurement\" beanId=\"additionalMeasurement\" createOnElement=\"varbinds/element\">\r\n" +
+ " <jb:value property=\"name\" data=\"#/varbind_value\" />\r\n" +
+ " </jb:bean> --> \r\n" +
+ " \r\n" +
+ "</smooks-resource-list>");
+ testMap.put(".1.3.6.1.4.1.1751.2.46.12", "<?xml version=\"1.0\"?>\r\n" +
+ "<smooks-resource-list\r\n" +
+ " xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\"\r\n" +
+ " xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\"\r\n" +
+ " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.4.xsd\">\r\n" +
+ " <json:reader rootName=\"vesevent\" keyWhitspaceReplacement=\"-\">\r\n" +
+ " <json:keyMap>\r\n" +
+ " <json:key from=\"date&amp;time\" to=\"date-and-time\" />\r\n" +
+ " </json:keyMap>\r\n" +
+ " </json:reader>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.VesEvent\" beanId=\"vesEvent\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring property=\"event\" beanIdRef=\"event\"/>\r\n" +
+ " </jb:bean>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.Event\" beanId=\"event\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring property=\"commonEventHeader\" beanIdRef=\"commonEventHeader\"/>\r\n" +
+ " <jb:wiring property=\"faultFields\" beanIdRef=\"faultFields\"/> \r\n" +
+ " <!--<jb:wiring property=\"measurementsForVfScalingFields\" beanIdRef=\"measurementsForVfScalingFields\"/> --> \r\n" +
+ " </jb:bean> \r\n" +
+ " <!--<jb:bean class=\"org.onap.dcaegen2.ves.domain.MeasurementsForVfScalingFields\" beanId=\"measurementsForVfScalingFields\" createOnElement=\"simple\">\r\n" +
+ " <jb:wiring property=\"additionalMeasurements\" beanIdRef=\"additionalMeasurements\"/>\r\n" +
+ " </jb:bean>-->\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.CommonEventHeader\" beanId=\"commonEventHeader\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:expression property=\"version\">\"3.0\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventId\">\"XXXX\"</jb:expression>\r\n" +
+ " <jb:expression property=\"reportingEntityName\">\"VesAdapter\"</jb:expression>\r\n" +
+ " <jb:expression property=\"domain\">\"fault\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventName\" execOnElement=\"vesevent\" >commonEventHeader.domain+\"_\"+commonEventHeader.reportingEntityName +\"_\"+ faultFields.alarmCondition;</jb:expression>\r\n" +
+ " <jb:value property=\"sequence\" data=\"0\" default=\"0\" decoder=\"Long\"/>\r\n" +
+ " <jb:value property=\"lastEpochMicrosec\" data=\"#/time-received\" decoder=\"Double\" />\r\n" +
+ " <jb:value property=\"startEpochMicrosec\" data=\"#/time-received\" decoder=\"Double\"/>\r\n" +
+ " <jb:expression property=\"priority\">\"Medium\"</jb:expression>\r\n" +
+ " <jb:expression property=\"sourceName\">\"VesAdapter\"</jb:expression>\r\n" +
+ " </jb:bean> \r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.FaultFields\" beanId=\"faultFields\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:value property=\"alarmCondition\" data=\"#/trap-category\" />\r\n" +
+ " <jb:expression property=\"eventSeverity\">\"MINOR\"</jb:expression>\r\n" +
+ " <jb:expression property=\"eventSourceType\">\"SNMP Agent\"</jb:expression>\r\n" +
+ " <jb:expression property=\"specificProblem\">\"SNMP Fault\"</jb:expression>\r\n" +
+ " <jb:value property=\"faultFieldsVersion\" data=\"2.0\" default=\"2.0\" decoder=\"Double\" />\r\n" +
+ " <jb:wiring property=\"alarmAdditionalInformation\" beanIdRef=\"alarmAdditionalInformationroot\"/> \r\n" +
+ " <jb:expression property=\"vfStatus\">\"Active\"</jb:expression>\r\n" +
+ " \r\n" +
+ " </jb:bean> \r\n" +
+ " <jb:bean class=\"java.util.ArrayList\" beanId=\"alarmAdditionalInformationroot\" createOnElement=\"vesevent\">\r\n" +
+ " <jb:wiring beanIdRef=\"alarmAdditionalInformation\"/>\r\n" +
+ " </jb:bean>\r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.AlarmAdditionalInformation\" beanId=\"alarmAdditionalInformation\" createOnElement=\"varbinds/element\">\r\n" +
+ " <jb:value property=\"name\" data=\"#/varbind_oid\"/>\r\n" +
+ " <jb:value property=\"value\" data=\"#/varbind_value\" />\r\n" +
+ " </jb:bean>\r\n" +
+ " <!--<jb:bean class=\"java.util.ArrayList\" beanId=\"additionalMeasurements\" createOnElement=\"simple\">\r\n" +
+ " <jb:wiring beanIdRef=\"additionalMeasurement\"/>\r\n" +
+ " </jb:bean> \r\n" +
+ " \r\n" +
+ " <jb:bean class=\"org.onap.dcaegen2.ves.domain.AdditionalMeasurement\" beanId=\"additionalMeasurement\" createOnElement=\"varbinds/element\">\r\n" +
+ " <jb:value property=\"name\" data=\"#/varbind_value\" />\r\n" +
+ " </jb:bean> --> \r\n" +
+ " \r\n" +
+ "</smooks-resource-list>");
try {
- String actualResult = universalVesAdapter.transform(incomingJsonString.toString(), "snmp");
+
+ VESAdapterInitializer.setMappingFiles(testMap);
+ String actualResult = universalVesAdapter.transform(incomingJsonString.toString(), "snmp");
assertNotNull(actualResult);
assertNotEquals("", actualResult);
} catch (VesException exception) {
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfigurationTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfigurationTest.java
deleted file mode 100644
index c7497b4..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfigurationTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.onap.universalvesadapter.Application;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes=Application.class)
-public class DMaapMrUrlConfigurationTest {
-
- @Autowired
- @InjectMocks
- DMaapMrUrlConfiguration dMaapMrUrlConfiguration = new DMaapMrUrlConfiguration();
-
- @Test
- public void test() {
- String actualdata1 = dMaapMrUrlConfiguration.getConsumerProperties();
- String actualdata2 = dMaapMrUrlConfiguration.getPublisherProperties();
- String actualdata3 = dMaapMrUrlConfiguration.getUrl();
-
- assertEquals("../UniversalVesAdapter/src/main/resources/dme2/consumer.properties", actualdata1);
- assertEquals("../UniversalVesAdapter/src/main/resources/dme2/publisher.properties", actualdata2);
- assertEquals("http://localhost:8080/greeting12", actualdata3);
-
-
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DiskRepoConfigurationTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DiskRepoConfigurationTest.java
deleted file mode 100644
index 4d42641..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/DiskRepoConfigurationTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import static org.junit.Assert.*;
-
-import org.onap.universalvesadapter.Application;
-import org.onap.universalvesadapter.configs.DiskRepoConfiguration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes=Application.class)
-public class DiskRepoConfigurationTest {
-
- @InjectMocks
- @Autowired
- DiskRepoConfiguration diskRepoConfiguration = new DiskRepoConfiguration();
-
- @Test
- public void test() {
-
-
- String actualdata = diskRepoConfiguration.getFileRepositoryUrl();
-
- assertEquals("http://localhost:8888/fileAsString/", actualdata);
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/UniversalEventConfigurationTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/UniversalEventConfigurationTest.java
deleted file mode 100644
index c79caa9..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/configs/UniversalEventConfigurationTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.onap.universalvesadapter.Application;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes=Application.class)
-public class UniversalEventConfigurationTest {
-
- @Autowired
- @InjectMocks
- UniversalEventConfiguration universalEventConfiguration = new UniversalEventConfiguration();
-
- @Test
- public void test() {
- String actualdata = universalEventConfiguration.getConfigForEvent("default");
-
- assertEquals("defaultConfig.xml", actualdata);
-
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/controller/VesControllerTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/controller/VesControllerTest.java
index b35b7b4..582b4fc 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/controller/VesControllerTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/controller/VesControllerTest.java
@@ -22,7 +22,6 @@ package org.onap.universalvesadapter.controller;
import static org.junit.Assert.assertEquals;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.junit.Test;
@@ -34,7 +33,6 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
-import org.springframework.test.web.servlet.ResultActions;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=Application.class)
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/domain/ConfigFileDataTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/domain/ConfigFileDataTest.java
deleted file mode 100644
index 9f51cae..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/domain/ConfigFileDataTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.domain;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class ConfigFileDataTest {
-
- @Autowired
- ConfigFileData configFileData = new ConfigFileData();
- @Test
- public void test() {
- configFileData.setXmlContent("xmlContent");
- configFileData.setXmlFileName("xmlFileName");
-
- assertEquals(configFileData.getXmlContent(), "xmlContent");
- assertEquals(configFileData.getXmlFileName(), "xmlFileName");
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileReadExceptionTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileReadExceptionTest.java
deleted file mode 100644
index 1d5307a..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileReadExceptionTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.exception;
-
-import org.junit.Test;
-
-public class ConfigFileReadExceptionTest {
-
-
- @Test
- public void test() {
- ConfigFileReadException se = new ConfigFileReadException("message");
- ConfigFileReadException se1 = new ConfigFileReadException("message", se);
-
-
- }
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionExceptionTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionExceptionTest.java
index 2fda822..ac60be1 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionExceptionTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionExceptionTest.java
@@ -19,8 +19,6 @@
*/
package org.onap.universalvesadapter.exception;
-import static org.junit.Assert.*;
-
import org.junit.Test;
public class ConfigFileSmooksConversionExceptionTest {
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/AdapterServiceTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/AdapterServiceTest.java
deleted file mode 100644
index 298ab69..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/AdapterServiceTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.when;
-
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.MockitoAnnotations;
-import org.onap.dcaegen2.ves.domain.AdditionalField;
-import org.onap.universalvesadapter.Application;
-import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.service.AdapterService;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.util.FileCopyUtils;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes=Application.class)
-public class AdapterServiceTest {
-
-// private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Autowired
- @InjectMocks
- private AdapterService adapterService;
-
-
- //AdapterService adapterService = new AdapterService();
-
- @Value("${mapperConfig.file}")
- private String mapperConfigFile;
-
- @Test
- public void identifyEventTypeFromIncomingJson() throws MapperConfigException, FileNotFoundException, IOException {
-
- String inputJsonString = "{ "
- + "\"protocol version\":\"v2c\", "
- + "\"notify OID\":\".1.3.6.1.4.1.74.2.46.12.1.1AAA\", "
- + "\"cambria.partition\":\"dcae-snmp.client.research.att.com\", "
- + "\"trap category\":\"UCSNMP-HEARTBEAT\", "
- + "\"epoch_serno\": 15161177410000, "
- + "\"community\":\"public\", "
- + "\"time received\": 1516117741, "
- + "\"agent name\":\"localhost\", "
- + "\"agent address\":\"127.0.0.1\", "
- + "\"community len\": 6, "
- + "\"notify OID len\": 12, "
- + "\"varbinds\": [{ "
- + " \"varbind_type\":\"octet\", "
- + " \"varbind_oid\":\".1.3.6.1.4.1.74.2.46.12.1.1.1\", "
- + " \"varbind_value\":\"ucsnmp heartbeat - ignore\" "
- + " }, { "
- + " \"varbind_type\":\"octet\", "
- + " \"varbind_oid\":\".1.3.6.1.4.1.74.2.46.12.1.1.2\", "
- + " \"varbind_value\":\"Tue Jan 16 10:49:01 EST 2018\" "
- + " }] "
- + "}";
-
- String domain = "";
-
- String mappingFileContent = "{" + " \"entries\" : ["
- + " {"
- + " \"priority\" : 1,"
- + " \"evaluation\" : {"
- + " \"operand\" : \"STARTSWITH\","
- + " \"field\" : \"notify OID\","
- + " \"value\" : \".1.3.6.1.4.1.74\","
- + " \"datatype\" : \"STRING\","
- + " \"lhs\" : null,"
- + " \"rhs\" : null "
- + " },"
- + " \"result\" : \"snmp\""
- + " } "
- + " ]"
- + "}";
- MapperConfigUtils.readMapperConfigFile(mappingFileContent);
- domain = adapterService.identifyEventTypeFromIncomingJson(inputJsonString);
- adapterService.identifyEventTypeFromIncomingJson(inputJsonString);
-
-
- assertEquals("snmp", domain);
- }
- /*@Test
- public void testidentifyEventTypeFromIncomingJson() throws MapperConfigException{
- String inputJsonString = "{ "
- + "\"protocol version\":\"v2c\", "
- + "\"notify OID\":\".1.3.6.1.4.1.74.2.46.12.1.1AAA\", "
- + "\"cambria.partition\":\"dcae-snmp.client.research.att.com\", "
- + "\"trap category\":\"UCSNMP-HEARTBEAT\", "
- + "\"epoch_serno\": 15161177410000, "
- + "\"community\":\"public\", "
- + "\"time received\": 1516117741, "
- + "\"agent name\":\"localhost\", "
- + "\"agent address\":\"127.0.0.1\", "
- + "\"community len\": 6, "
- + "\"notify OID len\": 12, "
- + "\"varbinds\": [{ "
- + " \"varbind_type\":\"octet\", "
- + " \"varbind_oid\":\".1.3.6.1.4.1.74.2.46.12.1.1.1\", "
- + " \"varbind_value\":\"ucsnmp heartbeat - ignore\" "
- + " }, { "
- + " \"varbind_type\":\"octet\", "
- + " \"varbind_oid\":\".1.3.6.1.4.1.74.2.46.12.1.1.2\", "
- + " \"varbind_value\":\"Tue Jan 16 10:49:01 EST 2018\" "
- + " }] "
- + "}";
-// when(MapperConfigUtils.checkIncomingJsonForMatchingDomain(inputJsonString)).thenReturn("snmp");
- String actualDomain=adapterService.identifyEventTypeFromIncomingJson(inputJsonString);
- assertEquals("default", actualDomain);
- }*/
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DMaapServiceTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DMaapServiceTest.java
index 7183fc5..f238306 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DMaapServiceTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DMaapServiceTest.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
-*/
+
package org.onap.universalvesadapter.service;
import static org.junit.Assert.*;
@@ -140,3 +140,4 @@ public class DMaapServiceTest {
}
+*/ \ No newline at end of file
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DiskRepoConfigFileServiceTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DiskRepoConfigFileServiceTest.java
index 8af6270..a639c1f 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DiskRepoConfigFileServiceTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/DiskRepoConfigFileServiceTest.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
-*/
+
package org.onap.universalvesadapter.service;
import static org.junit.Assert.assertEquals;
@@ -78,3 +78,4 @@ public class DiskRepoConfigFileServiceTest {
}
}
+*/ \ No newline at end of file
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/MongoDbConfigFileServiceTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/MongoDbConfigFileServiceTest.java
deleted file mode 100644
index 70c4a1f..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/MongoDbConfigFileServiceTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.onap.universalvesadapter.Application;
-import org.onap.universalvesadapter.service.MongoDbConfigFileService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes=Application.class)
-public class MongoDbConfigFileServiceTest {
-
-
- private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
-
- @Autowired
- private MongoDbConfigFileService mongoDbConfigFileService;
-
- @Test
- public void testReadConfigFile() {
- String configContent = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> "
- + "<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" "
- + "xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" "
- + " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> "
- + " <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> "
- + " </json:reader> "
- + "</smooks-resource-list>";
-
- assertEquals(configContent, mongoDbConfigFileService.readConfigFile("sample.xml"));
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/VesServiceTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/VesServiceTest.java
index a0780c9..3ad47f0 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/VesServiceTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/service/VesServiceTest.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
-*/
+
package org.onap.universalvesadapter.service;
import static org.junit.Assert.*;
@@ -69,7 +69,7 @@ public class VesServiceTest {
@Test
- public void testStart() {
+ public void testStart() throws IOException {
String[] incomingMessages = {"{ "
+ " \"protocol version \": \"v2c \", "
@@ -94,14 +94,14 @@ public class VesServiceTest {
+ " }] "
+ "}"};
try {
- Mockito.when(dmaapService.consumeFromDMaap()).thenReturn(Arrays.asList(incomingMessages)).thenReturn(() -> Collections.emptyIterator());
+ // Mockito.when(dmaapService.fetchAndPublishInDMaaP(null,null)).thenReturn(Arrays.asList(incomingMessages)).thenReturn(() -> Collections.emptyIterator());
} catch (Exception e) {
eLOGGER.error("Error occurred : " + e.getMessage());
}
ArgumentCaptor<?> valueCapture = ArgumentCaptor.forClass(String.class);
try {
- doNothing().when(dmaapService).publishToDMaap((String) valueCapture.capture());
+ //doNothing().when(dmaapService).fetchAndPublishInDMaaP((String) valueCapture.capture());
} catch (DMaapException e) {
eLOGGER.error("Error occurred : " + e.getMessage());
}
@@ -141,3 +141,4 @@ public class VesServiceTest {
}
}
+*/ \ No newline at end of file
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/MapperConfigUtilsTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/MapperConfigUtilsTest.java
index 85015da..c448908 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/MapperConfigUtilsTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/MapperConfigUtilsTest.java
@@ -19,24 +19,13 @@
*/
package org.onap.universalvesadapter.utils;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.Set;
-import java.util.TreeSet;
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.mappingconfig.Entry;
-import org.onap.universalvesadapter.mappingconfig.Evaluation;
-import org.onap.universalvesadapter.mappingconfig.MapperConfig;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-
public class MapperConfigUtilsTest {
private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/ParallelTasksTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/ParallelTasksTest.java
deleted file mode 100644
index 096f071..0000000
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/ParallelTasksTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.onap.universalvesadapter.utils.ParallelTasks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ParallelTasksTest {
-
- private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Test
- public void testParallelTasks() {
-
- ParallelTasks tasks = new ParallelTasks();
- assertNotNull(tasks);
-
- }
-
- @Test
- public void testAdd() {
-
- StringBuffer test = new StringBuffer() ;
- ParallelTasks tasks = new ParallelTasks();
- tasks.add(() -> test.append("setHere")) ;
- try {
- tasks.startParallelTasks();
- } catch (InterruptedException e) {
- eLOGGER.error("Error occurred : " + e.getMessage());
- }
- assertNotNull("setHere", test.toString());
-
- }
-
- @Test
- public void testStartParallelTasks() {
- StringBuffer test = new StringBuffer() ;
- ParallelTasks tasks = new ParallelTasks();
- tasks.add(() -> test.append("setHere")) ;
- tasks.add(() -> test.append("setHere")) ;
- tasks.add(() -> test.append("setHere")) ;
- try {
- tasks.startParallelTasks();
- } catch (InterruptedException e) {
- eLOGGER.error("Error occurred : " + e.getMessage());
- }
- assertNotNull("setHeresetHeresetHere", test.toString());
- }
-
-}
diff --git a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/SmooksUtilsTest.java b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/SmooksUtilsTest.java
index 41431f7..42974f6 100644
--- a/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/SmooksUtilsTest.java
+++ b/UniversalVesAdapter/src/test/java/org/onap/universalvesadapter/utils/SmooksUtilsTest.java
@@ -19,34 +19,17 @@
*/
package org.onap.universalvesadapter.utils;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertEquals;
-import java.beans.beancontext.BeanContext;
import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
import org.milyn.Smooks;
-import org.milyn.container.ExecutionContext;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.junit.MockitoJUnitRunner;
import org.onap.dcaegen2.ves.domain.VesEvent;
-import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.utils.SmooksUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.util.FileCopyUtils;
import org.xml.sax.SAXException;
public class SmooksUtilsTest {