aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java
diff options
context:
space:
mode:
authorPooja03 <PM00501616@techmahindra.com>2018-08-09 17:49:09 +0530
committerPooja03 <PM00501616@techmahindra.com>2018-08-09 17:49:09 +0530
commitc17ce648ecc3453df8754b936f2b344f13f6dc65 (patch)
treedc57f5dc5bc9a6d02b415b11ceed2ae31e573af3 /UniversalVesAdapter/src/main/java
parent1463aaab6db65130de04d84a68fd9331a1c0caa9 (diff)
Integratation of DMaaP, Mapping File
DMaaP integratation, Mapping File, Initialization of Adapter Change-Id: I826aa2e64fa7c155f088a7519c24887ce88e2ec4 Issue-ID: DCAEGEN2-335 Signed-off-by: Pooja03 <PM00501616@techmahindra.com>
Diffstat (limited to 'UniversalVesAdapter/src/main/java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java8
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java129
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java24
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java187
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java255
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java337
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java56
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java42
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java7
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java37
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java25
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java68
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java383
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java93
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java116
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java91
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java48
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java220
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java45
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java126
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java50
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java60
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java56
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java46
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java128
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java53
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java79
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java57
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java44
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java3
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java29
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java3
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java4
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java19
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java6
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java41
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java290
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java52
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java158
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java185
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java288
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java62
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java12
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java87
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java14
47 files changed, 3296 insertions, 1001 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
index fa3b89c..95b31fb 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java
@@ -19,7 +19,7 @@
*/
package org.onap.universalvesadapter.adapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
+
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
@@ -32,7 +32,6 @@ import org.onap.universalvesadapter.exception.VesException;
*/
public interface GenericAdapter {
-// String transform(String incomingJsonString) throws ConfigFileReadException;
/**
* It will take in an incoming json and identify the json type for different
@@ -42,10 +41,9 @@ public interface GenericAdapter {
* @param incomingJsonString json that is received on DMaap topic
* @param eventType type identified from incoming json
* @return VES format json
- * @throws ConfigFileReadException if unable to read the configuration file
* @throws ConfigFileSmooksConversionException if unable to convert config file data to smooks object
- * @throws VesException if unable to convert into ves json
+ *
*/
- String transform(String incomingJsonString, String eventType) throws ConfigFileReadException, ConfigFileSmooksConversionException, VesException;
+ String transform(String incomingJsonString, String eventType) throws ConfigFileSmooksConversionException, VesException;
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
index 318c8cd..65c7b9c 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
@@ -24,26 +24,25 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-
import org.milyn.Smooks;
import org.onap.dcaegen2.ves.domain.VesEvent;
-import org.onap.universalvesadapter.configs.UniversalEventConfiguration;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.service.ConfigFileService;
+import org.onap.universalvesadapter.service.VESAdapterInitializer;
import org.onap.universalvesadapter.utils.SmooksUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.SAXException;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSyntaxException;
/**
* Default implementation of the Generic Adapter
@@ -51,86 +50,88 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author kmalbari
*
*/
+
@Component
-public class UniversalEventAdapter implements GenericAdapter{
-
+public class UniversalEventAdapter implements GenericAdapter {
+
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private UniversalEventConfiguration configuration;
-
- @Resource(name="diskRepoConfigFileService")
- private ConfigFileService configFileService;
-
-// private Smooks smooks;
-
+
+ private String enterpriseId;
+ @Value("${defaultMappingFileName}")
+ private String defaulMappingFileName;
private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>();
- /*public String transform(String incomingJsonString) throws ConfigFileReadException {
- String result = "";
- try {
- //reading config file.. for now, looking at it as just one time operation
- if(null == smooks){
- String configFileData = configFileService.readConfigFile(configuration.getConfigFile());
- smooks = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
- }
-
- VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooks, incomingJsonString);
- ObjectMapper objectMapper = new ObjectMapper();
- result = objectMapper.writeValueAsString(vesEvent);
- } catch (IOException | SAXException e) {
- e.printStackTrace();
- }
-
- return result;
- }*/
-
+ public UniversalEventAdapter() {
+ }
+ /**
+ * transforms JSON to VES format and and returns the ves Event
+ *
+ * @param IncomingJason,eventType
+ * @return ves Event
+ */
@Override
- public String transform(String incomingJsonString, String eventType) throws ConfigFileReadException,
- ConfigFileSmooksConversionException, VesException {
+ public String transform(String incomingJsonString, String eventType)
+ throws ConfigFileSmooksConversionException, VesException {
String result = "";
+ String configFileData;
try {
- if(null == eventToSmooksMapping.get(eventType)){
- LOGGER.debug("No smooks mapping for this event type " + eventType + ".. reading config file");
- String configFileData = configFileService.readConfigFile(configuration.getConfigForEvent(eventType));
- LOGGER.debug("Read config file " + configFileData);
+
+ Gson gson = new Gson();
+ JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class);
+ JsonElement results = body.get("notify OID");
+ String notifyOid = results.getAsString();
+
+ // extracting enterprise id from notify OID of SNMP trap.
+ enterpriseId = notifyOid.substring(0, notifyOid.length() - 4);
+
+ if (VESAdapterInitializer.getMappingFiles().containsKey(enterpriseId)) {
+ configFileData = VESAdapterInitializer.getMappingFiles().get(enterpriseId);
+ LOGGER.debug("Using Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId);
+ } else {
+
+ configFileData = VESAdapterInitializer.getMappingFiles().get(defaulMappingFileName);
+ LOGGER.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId);
+ }
+
Smooks smooksTemp = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
eventToSmooksMapping.put(eventType, smooksTemp);
- LOGGER.debug("Added smooks mapping for event type" + eventType);
- }
-
-
- LOGGER.debug("Read smooks mapping for event type" + eventType);
- LOGGER.debug("Transforming incoming json now");
- VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(eventToSmooksMapping.get(eventType), incomingJsonString);
+
+ VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,incomingJsonString);
LOGGER.debug("Incoming json transformed to VES format successfully");
ObjectMapper objectMapper = new ObjectMapper();
- result = objectMapper.writeValueAsString(vesEvent);
+ result = objectMapper.writeValueAsString(vesEvent);
LOGGER.debug("Serialized VES json");
} catch (JsonProcessingException exception) {
- throw new VesException("Unable to convert pojo to VES format" + "\n Reason :" + exception.getMessage());
+ throw new VesException("Unable to convert pojo to VES format, Reason :{}", exception);
} catch (SAXException | IOException exception) {
- throw new ConfigFileSmooksConversionException("Unable to convert config file into smooks for event type " + eventType
- + "\n Reason :" + exception.getMessage());
+ //Invalid Mapping file
+ LOGGER.error("Dropping this Trap :{},due to error Occured :Reason:", incomingJsonString, exception);
+
+ } catch (JsonSyntaxException exception) {
+ // Invalid Trap
+ LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception);
+ }catch (JsonParseException exception) {
+ // Invalid Trap
+ LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception);
+ }
+ catch (RuntimeException exception) {
+
+ LOGGER.error("Dropping this Trap :{},Reason:", incomingJsonString, exception);
+
}
return result;
}
-
-
+
/**
* Closes all open smooks' instances before bean is destroyed
*/
@PreDestroy
- public void destroy(){
-// if(null != smooks)
-// smooks.close();
-
- for(Smooks smooks : eventToSmooksMapping.values())
+ public void destroy() {
+ for (Smooks smooks : eventToSmooksMapping.values())
smooks.close();
-
LOGGER.debug("All Smooks objects closed");
- }
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java
deleted file mode 100644
index e47af70..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-public abstract class Configuration {
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java
new file mode 100644
index 0000000..fd4185b
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java
@@ -0,0 +1,187 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.configs;
+
+
+
+import java.util.Locale;
+
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Objects;
+
+
+/**
+ * <p>
+ * Contains common parameters for both DMaaP Message Router Publisher and Subscriber Configs
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ */
+@Component
+public abstract class DMaaPMRBaseConfig {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DMaaPMRBaseConfig.class);
+
+ protected String hostName;
+ protected Integer portNumber;
+ protected String topicName;
+ protected String protocol;
+ protected String userName;
+ protected String userPassword;
+ protected String contentType;
+
+ /**
+ * Provides host name e.g. mrlocal-mtnjftle01.homer.com
+ *
+ * @return host name
+ */
+ public String getHostName() {
+ return hostName;
+ }
+
+
+ /**
+ * Provides Port Number of DMaaP MR Topic Host. Defaults to 80
+ *
+ * @return host port number
+ */
+ public Integer getPortNumber() {
+ return portNumber;
+ }
+
+ /**
+ * Provides topic name e.g. com.dcae.dmaap.mtnje2.DcaeTestVES
+ *
+ * @return topic name
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * Provides protocol type e.g. http or https
+ *
+ * @return protocol type
+ */
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * Provides content type e.g. application/json
+ *
+ * @return content type
+ */
+ public String getContentType() {
+ return contentType;
+ }
+
+
+ /**
+ * Provides User name for the DMaaP MR Topic authentication
+ *
+ * @return user name
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * Provides User password for the DMaaP MR Topic authentication
+ *
+ * @return user Password
+ */
+ public String getUserPassword() {
+ return userPassword;
+ }
+
+
+ /**
+ * Trims, adjusts casing and validates user input String for protocol selection
+ *
+ * @param protocol - User input for protocol String
+ * @return - network protocol e.g http or https
+ */
+ protected static String normalizeValidateProtocol(final String protocol) {
+ // validate that only http and https are supported protocols are Supported for DMaaP MR
+ String normalizedProtocolString = protocol.trim().toLowerCase(Locale.ENGLISH);
+ if (normalizedProtocolString.isEmpty() ||
+ !("http".equals(normalizedProtocolString) || "https".equals(normalizedProtocolString))) {
+
+ final String errorMessage =
+ "Unsupported protocol selection. Only HTTPS and HTTPS are currently supported for DMaaP MR";
+
+ throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return normalizedProtocolString;
+ }
+
+
+ /**
+ * Trims, adjust casing and validates content type is supported by DMaaP.
+ *
+ * NOTE: DMaaP currently only support application/json content type
+ *
+ * @param contentType content type that needs to checked for DMaaP MR support
+ * @return true if content type is supported by DMaaP MR
+ */
+ protected static String normalizeValidateContentType(final String contentType) {
+ // Current DMaaP MR is only supporting "application/json" content type
+ String normalizedContentType = contentType.trim().toLowerCase(Locale.ENGLISH);
+ final boolean isSupported = contentType.equals(HTTPUtils.JSON_APPLICATION_TYPE);
+ if (!isSupported) {
+ final String errorMessage =
+ "Unsupported content type selection. Only application/json is currently supported for DMaaP MR";
+
+ throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return normalizedContentType;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DMaaPMRBaseConfig)) {
+ return false;
+ }
+ DMaaPMRBaseConfig that = (DMaaPMRBaseConfig) o;
+ return Objects.equal(hostName, that.hostName) &&
+ Objects.equal(portNumber, that.portNumber) &&
+ Objects.equal(topicName, that.topicName) &&
+ Objects.equal(protocol, that.protocol) &&
+ Objects.equal(userName, that.userName) &&
+ Objects.equal(userPassword, that.userPassword) &&
+ Objects.equal(contentType, that.contentType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(hostName, portNumber, topicName, protocol, userName, userPassword, contentType);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
new file mode 100644
index 0000000..70fcf58
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
@@ -0,0 +1,255 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+
+//TO-DO Lisence Issue ASK to Kedar???????????
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for DMaaP MR Publisher.
+ * <p>
+ * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber
+ * Configuration
+ * </p>
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ *
+ */
+@ComponentScan
+public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig {
+
+
+ /**
+ * Publisher batching queue size
+ */
+ private int maxBatchSize;
+
+ /**
+ * Publisher Recovery Queue Size
+ */
+ private int maxRecoveryQueueSize;
+
+ /**
+ * Default uri path prefix
+ */
+ private String dmaapUriPathPrefix ;
+
+ private DMaaPMRPublisherConfig(@Nonnull String hostName, @Nonnull Integer portNumber, @Nonnull String topicName,
+ @Nonnull String protocol, String userName, String userPassword, @Nonnull String contentType,
+ int maxBatchSize, int maxRecoveryQueueSize,String dmaapUriPathPrefix) {
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+ this.topicName = topicName;
+ this.protocol = protocol;
+ this.userName = userName;
+ this.userPassword = userPassword;
+ this.contentType = contentType;
+ this.maxBatchSize = maxBatchSize;
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ this.dmaapUriPathPrefix =dmaapUriPathPrefix;
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private int maxBatchSize;
+ private int maxRecoveryQueueSize;
+ private String dmaapUriPathPrefix ;
+
+
+
+ public Builder(@Nonnull String hostName, @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+ this.hostName = hostName;
+ this.topicName = topicName;
+ // Default values
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol =dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+
+ this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE();
+ this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE();
+ this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+ }
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber
+ * custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication
+ * will be disabled
+ *
+ * @param userName
+ * user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ /**
+ * Setup user password for authentication. If no password is provided
+ * authentication will be disabled
+ *
+ * @param userPassword
+ * user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher protocol - Defaults to https. Note: Only http and
+ * https are currently supported.
+ *
+ * @param protocol
+ * protocol e.g. https
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher content-type - Defaults to application/json
+ *
+ * @param contentType
+ * content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher Max Batch Size - Defaults to 100
+ *
+ * @param maxBatchSize
+ * max Batch Size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ /**
+ * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold
+ * messages temporarily in case DMaaP MR Publisher topic is not responding for
+ * any reason. Defaults to 100,000
+ *
+ * @param maxRecoveryQueueSize
+ * max recovery queue size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) {
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ return this;
+ }
+
+
+ /**
+ * Creates immutable instance of {@link DMaaPMRPublisherConfig}
+ *
+ * @return Builds and returns thread safe, immutable
+ * {@link DMaaPMRPublisherConfig} object
+ */
+ public DMaaPMRPublisherConfig build() {
+ return new DMaaPMRPublisherConfig(hostName, portNumber, topicName, protocol, userName, userPassword,
+ contentType, maxBatchSize, maxRecoveryQueueSize,dmaapUriPathPrefix);
+ }
+
+ }
+ public String getDmaapUriPathPrefix() {
+ return dmaapUriPathPrefix;
+ }
+
+ /**
+ * Returns max Publisher Batch Queue Size
+ *
+ * @return max Publisher Batch Queue size
+ */
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * Returns max Publisher Recovery Queue Size
+ *
+ * @return max Recovery Queue size
+ */
+ public int getMaxRecoveryQueueSize() {
+ return maxRecoveryQueueSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o;
+ return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
new file mode 100644
index 0000000..2b8158a
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
@@ -0,0 +1,337 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.onap.universalvesadapter.utils.DmaapConfig;
+
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for Subscriber.
+ * <p>
+ * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ */
+public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig {
+
+ private final String consumerId;
+ private final String consumerGroup;
+ private final int timeoutMS;
+ private final int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPrefix;
+
+ private DMaaPMRSubscriberConfig(@Nonnull String hostName,
+ @Nonnull Integer portNumber,
+ @Nonnull String topicName,
+ @Nonnull String protocol,
+ String userName,
+ String userPassword,
+ @Nonnull String contentType,
+ @Nonnull String consumerId,
+ @Nonnull String consumerGroup,
+ @Nonnull int timeoutMS,
+ @Nonnull int messageLimit,
+ String timeoutMSParam,
+ String messageLimitParam,
+ String uriPrefix) {
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+ this.topicName = topicName;
+ this.protocol = protocol;
+ this.userName = userName;
+ this.userPassword = userPassword;
+ this.contentType = contentType;
+ this.consumerId = consumerId;
+ this.consumerGroup = consumerGroup;
+ this.timeoutMS = timeoutMS;
+ this.messageLimit = messageLimit;
+ this.timeoutMSParam=timeoutMSParam;
+ this.messageLimitParam=messageLimitParam;
+ this.uriPrefix=uriPrefix;
+
+
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private String consumerId;
+ private String consumerGroup;
+ private int timeoutMS;
+ private int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPreifix;
+
+
+
+ public Builder(@Nonnull String hostName,
+ @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+
+ // Required Values
+ this.hostName = hostName;
+ this.topicName = topicName;
+ // Default values
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol = dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+ this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID();
+ this.consumerGroup = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
+ this.timeoutMS =dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
+ this.messageLimit = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
+ this.timeoutMSParam=dmaapConfig.getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
+ this.messageLimitParam=dmaapConfig.getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
+ this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+
+ }
+
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication will be disabled
+ *
+ * @param userName user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+
+ /**
+ * Setup user password for authentication. If no password is provided authentication will be disabled
+ *
+ * @param userPassword user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Subscriber protocol - Defaults to https.
+ * Note: Only http and https are currently supported.
+ *
+ * @param protocol protocol e.g. https or http
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber content-type - Defaults to application/json
+ *
+ * @param contentType content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Consumer Id - Defaults to random Id
+ *
+ * @param consumerId - custom consumer ID
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerId(@Nonnull String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ /**
+ * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID
+ *
+ * @param consumerGroup - custom Consumer Group
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerGroup(@Nonnull String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ /**
+ * Setup Custom Subscriber timeout in ms - Default to no timeout limit
+ *
+ * @param timeoutMS timeout in milliseconds
+ * @return Builder object itself for chaining
+ */
+ public Builder setTimeoutMS(@Nonnull int timeoutMS) {
+ this.timeoutMS = timeoutMS;
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber Message Limit - Default to no limit
+ *
+ * @param messageLimit message Limit
+ * @return Builder object itself for chaining
+ */
+ public Builder setMessageLimit(@Nonnull int messageLimit) {
+ this.messageLimit = messageLimit;
+ return this;
+ }
+
+ /**
+ * Builds Immutable instance of {@link DMaaPMRSubscriberConfig}
+ *
+ * @return immutable DMaaP Subscriber Config Object
+ * @throws IOException
+ */
+ public DMaaPMRSubscriberConfig build() throws IOException {
+ return new DMaaPMRSubscriberConfig(hostName, portNumber, topicName, protocol, userName, userPassword,
+ contentType, consumerId, consumerGroup, timeoutMS, messageLimit, timeoutMSParam, messageLimitParam,
+ uriPreifix);
+ }
+
+ }
+
+
+ public String getTimeoutMSParam() {
+ return timeoutMSParam;
+ }
+
+ public void setTimeoutMSParam(String timeoutMSParam) {
+ this.timeoutMSParam = timeoutMSParam;
+ }
+
+ public String getMessageLimitParam() {
+ return messageLimitParam;
+ }
+
+ public void setMessageLimitParam(String messageLimitParam) {
+ this.messageLimitParam = messageLimitParam;
+ }
+
+ public String getUriPrefix() {
+ return uriPrefix;
+ }
+
+ public void setUriPrefix(String uriPreifix) {
+ this.uriPrefix = uriPreifix;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Id
+ *
+ * @return consumer Id
+ */
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Group
+ *
+ * @return consumer group
+ */
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * DMaaP MR Subscriber Timeout in ms
+ *
+ * @return subscriber timeout ms
+ */
+ public int getTimeoutMS() {
+ return timeoutMS;
+ }
+
+ /**
+ * DMaaP MR Subscriber message limit
+ *
+ * @return subscriber message limit
+ */
+ public int getMessageLimit() {
+ return messageLimit;
+ }
+
+ /**
+ * DMaaP property file
+ *
+ * @return Properties
+ */
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o;
+ return Objects.equal(consumerId, that.consumerId) &&
+ Objects.equal(consumerGroup, that.consumerGroup) &&
+ Objects.equal(timeoutMS, that.timeoutMS) &&
+ Objects.equal(messageLimit, that.messageLimit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit);
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java
deleted file mode 100644
index 2b9a820..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * Configuration for Dmaap MR Service
- *
- * @author kmalbari
- *
- */
-@Component
-public class DMaapMrUrlConfiguration extends Configuration {
-
- @Value("${dmaap.url}")
- private String url;
-
- @Value("${dmaap.consumer_props}")
- private String consumerProperties;
-
- @Value("${dmaap.publisher_props}")
- private String publisherProperties;
-
- public String getPublisherProperties() {
- return publisherProperties;
- }
-
- public String getConsumerProperties() {
- return consumerProperties;
- }
-
- public String getUrl() {
- return url;
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java
deleted file mode 100644
index b1daf0d..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.configs;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- *
- * Configuration for disk repository service
- *
- * @author kmalbari
- *
- */
-@Component
-public class DiskRepoConfiguration extends Configuration {
-
- @Value("${fileService.url}")
- private String fileRepositoryUrl;
-
- public String getFileRepositoryUrl() {
- return fileRepositoryUrl;
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
index 3edca56..6f85ef3 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java
@@ -33,12 +33,9 @@ import org.springframework.stereotype.Component;
*
*/
@Component
-public class UniversalEventConfiguration extends Configuration {
+public class UniversalEventConfiguration{
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Value("${snmpTrap.configFile}")
- private String configFile;
@Value("${universal.configFiles}")
private String configFiles;
@@ -68,6 +65,4 @@ public class UniversalEventConfiguration extends Configuration {
}
- //think about adding mapping files on runtime as well
-
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java
deleted file mode 100644
index e844270..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.controller;
-
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
-
-
-@RestController
-public class MockDmaapController {
-
- @RequestMapping("/greeting")
- public String greeting(@RequestParam(value="name", defaultValue="World") String name) {
-// return new Greeting(counter.incrementAndGet(),
-// String.format(template, name));
- return "{ \"protocol version\": \"v2c\", \"notify OID\": \".1.3.6.1.4.1.74.2.46.12.1.1\", \"cambria.partition\": \"dcae-snmp.client.research.att.com\" , \"trap category\": \"UCSNMP-HEARTBEAT\", \"epoch_serno\": 15161177410000, \"community\" : \"public\", \"time received\": 1516117741, \"agent name\": \"localhost\", \"agent address\": \"127.0.0.1\", \"community len\": 6, \"notify OID len\": 12, \"varbinds\": [ { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.1\", \"varbind_value\": \"ucsnmp heartbeat - ignore\" }, { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.2\", \"varbind_value\": \"Tue Jan 16 10:49:01 EST 2018\" } ] }";
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
index 588c912..dfddfde 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java
@@ -19,6 +19,8 @@
*/
package org.onap.universalvesadapter.controller;
+import org.onap.universalvesadapter.exception.MapperConfigException;
+import org.onap.universalvesadapter.service.VESAdapterInitializer;
import org.onap.universalvesadapter.service.VesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController;
*
* @author kmalbari
*/
+
@RestController
public class VesController {
@@ -40,17 +43,35 @@ public class VesController {
@Autowired
private VesService vesService;
+ @Autowired
+ private VESAdapterInitializer vESAdapterInitializer;
+
/**
* @return message that application is started
*/
@RequestMapping("/start")
public String start() {
- LOGGER.debug("UniversalVesAdapter Application starting...");
- vesService.start();
+ LOGGER.info("UniversalVesAdapter Application starting...");
+
+ try {
+ vesService.start();
+ } catch (MapperConfigException e) {
+
+ LOGGER.error("Config error:{}",e.getMessage(),e.getCause());
+ }
return "Application started";
}
+ @RequestMapping("/reload")
+ public void reloadMappingFileFromDB() {
+ LOGGER.debug("Reload of Mapping File is started");
+ vESAdapterInitializer.fetchMappingFile();
+ LOGGER.debug("Reload of Mapping File is completed");
+ }
+
+
+
/**
* @return message that application stop process is triggered
*/
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java
new file mode 100644
index 0000000..efd0b0d
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java
@@ -0,0 +1,68 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherImpl;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueFactory;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueImpl;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberImpl;
+import org.springframework.stereotype.Component;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+
+
+/**
+ * Guice Module to wire concrete implementations with interfaces
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+@Component
+public class AnalyticsDMaaPModule extends AbstractModule {
+
+
+ @Override
+ protected void configure() {
+
+ // Bind Http Client
+ bind(CloseableHttpClient.class).toInstance(HttpClients.createDefault());
+
+ // Bind Publishing queue
+ install(new FactoryModuleBuilder().implement(DMaaPMRPublisherQueue.class, DMaaPMRPublisherQueueImpl.class)
+ .build(DMaaPMRPublisherQueueFactory.class));
+
+ install(new FactoryModuleBuilder().implement(DMaaPMRPublisher.class, DMaaPMRPublisherImpl.class)
+ .build(DMaaPMRPublisherFactory.class));
+
+ install(new FactoryModuleBuilder().implement(DMaaPMRSubscriber.class, DMaaPMRSubscriberImpl.class)
+ .build(DMaaPMRSubscriberFactory.class));
+
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java
new file mode 100644
index 0000000..0638574
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java
@@ -0,0 +1,383 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+
+import static java.lang.String.format;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponse;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponseImpl;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponse;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponseImpl;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.ComponentScan;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+
+/**
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+@ComponentScan
+public abstract class BaseDMaaPMRComponent {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ public BaseDMaaPMRComponent() {}
+
+
+ /**
+ * Creates Base64 encoded Auth Header for given userName and Password
+ * If either user name of password are null return absent
+ *
+ * @param userName username
+ * @param userPassword user password
+ * @return base64 encoded auth header if username or password are both non null
+ */
+ protected static Optional<String> getAuthHeader(@Nullable final String userName,
+ @Nullable final String userPassword) {
+ if (userName == null || userPassword == null) {
+ return Optional.absent();
+ } else {
+ final String auth = userName + ":" + userPassword;
+ final Charset isoCharset = Charset.forName("ISO-8859-1");
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
+ return Optional.of("Basic " + new String(encodedAuth, isoCharset));
+ }
+ }
+
+
+ /**
+ * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
+ *
+ * @param publisherConfig publisher settings
+ *
+ * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
+ */
+ protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
+ final String hostName = publisherConfig.getHostName();
+ final Integer portNumber = publisherConfig.getPortNumber();
+ final String getProtocol = publisherConfig.getProtocol();
+ final String topicName = publisherConfig.getTopicName();
+ final String dmaapUriPathPrefix =publisherConfig.getDmaapUriPathPrefix();
+ URI publisherURI = null;
+ try {
+ publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(dmaapUriPathPrefix + topicName).build();
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating publisher URI: %s", e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
+ return publisherURI;
+ }
+
+
+ /**
+ * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
+ *
+ * @param subscriberConfig subscriber settings
+ *
+ * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
+ */
+ protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
+ final String hostName = subscriberConfig.getHostName();
+ final Integer portNumber = subscriberConfig.getPortNumber();
+ final String getProtocol = subscriberConfig.getProtocol();
+ final String topicName = subscriberConfig.getTopicName();
+ final String consumerId = subscriberConfig.getConsumerId();
+ final String consumerGroup = subscriberConfig.getConsumerGroup();
+ final Integer timeoutMS = subscriberConfig.getTimeoutMS();
+ final Integer messageLimit = subscriberConfig.getMessageLimit();
+ URI subscriberURI = null;
+
+ try {
+ URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(subscriberConfig.getUriPrefix()
+ + topicName + "/"
+ + consumerGroup + "/" +
+ consumerId);
+ // add query params if present
+ if (timeoutMS > 0) {
+ uriBuilder.addParameter(subscriberConfig.getTimeoutMSParam(), timeoutMS.toString());
+ }
+ if (messageLimit > 0) {
+ uriBuilder.addParameter(subscriberConfig.getMessageLimitParam(),
+ messageLimit.toString());
+ }
+ subscriberURI = uriBuilder.build();
+
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating subscriber URI: %s", e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
+ return subscriberURI;
+ }
+
+
+ /**
+ * Creates 202 (Accepted) Response code message
+ *
+ * @param batchQueueSize batch Queue size
+ *
+ * @return response with 202 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
+ return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
+ "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
+ }
+
+
+ /**
+ * Creates 204 (No Content) Response code message
+ *
+ * @return response with 204 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
+ return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
+ "No Content - No Messages in batch queue for flushing to MR Topic", 0);
+ }
+
+
+ /**
+ * Creates Publisher Response for given response code, response Message and pending Message Count
+ *
+ * @param responseCode HTTP Status Code
+ * @param responseMessage response message
+ * @param pendingMessages pending messages in batch queue
+ *
+ * @return DMaaP MR Publisher Response
+ */
+ protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
+ responseMessage, int pendingMessages) {
+ return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
+ }
+
+
+ /**
+ * Returns weekly consistent pending messages in batch queue
+ *
+ * @param publisherQueue batch queue
+ * @param publisherConfig publisher settings
+ *
+ * @return pending messages to be published
+ */
+ protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
+ @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
+ return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
+ }
+
+
+ /**
+ * Creates Subscriber Response for give response Code, response Message and fetch messages
+ *
+ * @param responseCode response Code
+ * @param responseMessage response Message
+ * @param fetchedMessages fetched messages
+ *
+ * @return DMaaP MR Subscriber Response
+ */
+ protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
+ responseMessage, List<String> fetchedMessages) {
+ if (fetchedMessages == null) {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
+ } else {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
+ }
+ }
+
+
+ /**
+ * Custom response handler which extract status code and response body
+ *
+ * @return Pair containing Response code and response body
+ */
+ protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
+ return new ResponseHandler<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
+ // Get Response status code
+ final int status = response.getStatusLine().getStatusCode();
+ final HttpEntity responseEntity = response.getEntity();
+ // If response entity is not null - extract response body as string
+ String responseEntityString = "";
+ if (responseEntity != null) {
+ responseEntityString = EntityUtils.toString(responseEntity);
+ }
+ return new ImmutablePair<>(status, responseEntityString);
+ }
+ };
+ }
+
+
+ /**
+ * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
+ * be lost
+ *
+ * @param publisherQueue publisher queue
+ * @param messages recoverable messages to be published to recovery queue
+ */
+ protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
+ List<String> messages) {
+ try {
+ publisherQueue.addRecoverableMessages(messages);
+
+ LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
+ messages.size(), publisherQueue.getBatchQueueRemainingSize());
+
+ } catch (IllegalStateException e) {
+ final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
+ "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
+ messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ }
+
+
+
+
+
+ /**
+ * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
+ *
+ * @param messages messages that need to parsed to Json Array representation
+ * @return json string representation of message
+ */
+ protected static String convertToJsonString(@Nullable final List<String> messages) {
+ // If messages are null or empty just return empty array
+ if (messages == null || messages.isEmpty()) {
+ return "[]";
+ }
+
+
+ List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
+
+ try {
+ for (String message : messages) {
+ final JsonNode jsonNode = objectMapper.readTree(message);
+ jsonMessageObjectsList.add(jsonNode);
+ }
+ return objectMapper.writeValueAsString(jsonMessageObjectsList);
+ } catch (JsonProcessingException e) {
+ final String errorMessage =
+ format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DMaapException(errorMessage, LOG, e);
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts subscriber messages json string to List of messages. If message Json String is empty
+ * or null
+ *
+ * @param messagesJsonString json messages String
+ *
+ * @return List containing DMaaP MR Messages
+ */
+ protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+
+ final LinkedList<String> messages = new LinkedList<>();
+
+ // If message string is not null or not empty parse json message array to List of string messages
+ if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
+ && !("[]").equals(messagesJsonString.trim())) {
+
+ try {
+ // get root node
+ final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+ // iterate over root node and parse arrays messages
+ for (JsonNode jsonNode : rootNode) {
+ // if array parse it is array of messages
+ final String incomingMessageString = jsonNode.toString();
+ if (jsonNode.isArray()) {
+ final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+ for (Object message : messageList) {
+ final String jsonMessageString = objectMapper.writeValueAsString(message);
+ addUnescapedJsonToMessage(messages, jsonMessageString);
+ }
+ } else {
+ // parse it as object
+ addUnescapedJsonToMessage(messages, incomingMessageString);
+ }
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
+ " Json Error: %s", messagesJsonString, e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+ return messages;
+ }
+
+ /**
+ * Adds unescaped Json messages to given messages list
+ *
+ * @param messages message list in which unescaped messages will be added
+ * @param incomingMessageString incoming message string that may need to be escaped
+ */
+ private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+ if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+ messages.add(StringEscapeUtils.unescapeJava(
+ incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+ } else {
+ messages.add(StringEscapeUtils.unescapeJava(incomingMessageString));
+ }
+ }
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
new file mode 100644
index 0000000..cd76f79
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
@@ -0,0 +1,93 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.dmaap;
+
+import java.io.IOException;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class Creator {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(Creator.class);
+ private DMaaPMRFactory dMaaPMRFactoryInstance;
+ private String hostname;
+ private String publisherTopic;
+ private String subcriberTopic;
+ private DmaapConfig dmaapConfig;
+
+ @Autowired
+ public void setDmaapConfig(DmaapConfig dmaapConfig) {
+ this.dmaapConfig = dmaapConfig;
+ }
+
+ public Creator() {
+
+ }
+
+ // prop initializer
+
+ public void propertyFileInitializer() {
+
+ this.hostname = dmaapConfig.getHostname();
+ this.publisherTopic = dmaapConfig.getPublisherTopic();
+ this.subcriberTopic = dmaapConfig.getSubscriberTopic();
+ this.dMaaPMRFactoryInstance = DMaaPMRFactory.create();
+ LOGGER.info("The Hostname of DMaap is :" + hostname);
+
+
+ }
+
+ // publisher
+ public DMaaPMRPublisher getDMaaPMRPublisher(){
+ propertyFileInitializer();
+ DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null;
+ try {
+ dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(hostname, publisherTopic,dmaapConfig).build();
+ } catch (IOException e) {
+ LOGGER.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause());
+ }
+ return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig);
+ }
+
+ // subscriber
+ public DMaaPMRSubscriber getDMaaPMRSubscriber(){
+ propertyFileInitializer();
+ DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null;
+ try {
+ dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(hostname, subcriberTopic, dmaapConfig).build();
+ } catch (IOException e) {
+
+ LOGGER.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause());
+ }
+
+ return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig);
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java
new file mode 100644
index 0000000..50f214c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java
@@ -0,0 +1,116 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap;
+
+import javax.annotation.Nonnull;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+
+/**
+ * Creates pre injected implementations for {@link DMaaPMRPublisher} and {@link DMaaPMRSubscriber}
+ * <p>
+ * Usage:
+ * <p>Create an instance of DMaaP MR Factory</p>
+ * <pre>
+ * DMaaPFactory dmaapFactory = DMaaPFactory.initalize()
+ * </pre>
+ * <p>Create a new DMaaP MR Publisher</p>
+ * <pre>
+ * DMaaPMRPublisher publisher = dmaapFactory.createPublisher(publisherConfig)
+ * </pre>
+ * <p>Create new DMaaP MR Subscriber</p>
+ * <pre>
+ * DMaaPMRSubscriber subscriber = dmaapFactory.createSubscriber(subscriberConfig)
+ * </pre>
+ * <p>
+ * <strong>All Clients must use this Factory to initalize DMaaP Message Router Publishers and Subscribers</strong>
+ * </p>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+
+public class DMaaPMRFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRFactory.class);
+
+ private final Injector injector;
+
+ public DMaaPMRFactory(AbstractModule guiceModule) {
+ injector = Guice.createInjector(guiceModule);
+ }
+
+ /**
+ * Returns configured instance of {@link DMaaPMRPublisher}
+ *
+ * @param publisherConfig Publisher Config
+ * @return configured instance of DMaaP MR Publisher
+ */
+ public DMaaPMRPublisher createPublisher(@Nonnull DMaaPMRPublisherConfig publisherConfig) {
+ final DMaaPMRPublisherFactory publisherFactory = injector.getInstance(Key.get(DMaaPMRPublisherFactory.class));
+ LOG.debug("Creating new DMaaP MR Publisher Instance with configuration: {}", publisherConfig);
+ final DMaaPMRPublisher dMaaPMRPublisher = publisherFactory.create(publisherConfig);
+
+ LOG.info("Created new DMaaP MR Publisher Instance. Publisher creation time: {}",
+ dMaaPMRPublisher.getPublisherCreationTime());
+ return dMaaPMRPublisher;
+ }
+
+ /**
+ * Returns configured instance of {@link DMaaPMRSubscriber}
+ *
+ * @param subscriberConfig Subscriber Config
+ * @return configured instance of DMaaP MR Subscriber
+ */
+ public DMaaPMRSubscriber createSubscriber(@Nonnull DMaaPMRSubscriberConfig subscriberConfig) {
+ final DMaaPMRSubscriberFactory subscriberFactory = injector.getInstance(DMaaPMRSubscriberFactory.class);
+ LOG.debug("Creating new DMaaP MR Subscriber Instance with configuration: {}", subscriberConfig);
+ final DMaaPMRSubscriber dMaaPMRSubscriber = subscriberFactory.create(subscriberConfig);
+ LOG.info("Created new DMaaP MR Subscriber Instance. Subscriber creation time: {}",
+ dMaaPMRSubscriber.getSubscriberCreationTime());
+ return dMaaPMRSubscriber;
+ }
+
+ /**
+ * Creates an instance of {@link DMaaPMRFactory}
+ *
+ * @return {@link DMaaPMRFactory} factory instance
+ */
+ public static DMaaPMRFactory create() {
+ final DMaaPMRFactory dMaaPMRFactory = new DMaaPMRFactory(new AnalyticsDMaaPModule());
+ LOG.info("Created new instance of DMaaP MR Factory");
+ return dMaaPMRFactory;
+ }
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java
new file mode 100644
index 0000000..6d867b9
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java
@@ -0,0 +1,91 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * <p>
+ * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisher extends AutoCloseable {
+
+
+ /**
+ * <p>
+ * Adds collection of messages to DMaaP MR Topic Publishing Queue.
+ * <p>
+ * Note: Invoking this method may or may not cause publishing immediately
+ * as publishing in done is batch mode by default. Parameter maxBatchSize
+ * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size.
+ * If the maxBatchSize is reached all message will be published automatically
+ * during subsequent call.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return response which may contain Http Response code 202 (Accepted) as publishing
+ * will proceed when max batch size is reached. Throws {@link DMaapException}
+ * if publishing fails
+ */
+ DMaaPMRPublisherResponse publish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse}
+ * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return DMaaP Message Router Publisher Response. Throws {@link DMaapException}
+ * if force publishing fails
+ *
+ */
+ DMaaPMRPublisherResponse forcePublish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns
+ * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to
+ * be flushed response code 304 (Not Modified) will be returned
+ * </p>
+ *
+ * @return DMaaP Message Router Publisher Response
+ */
+ DMaaPMRPublisherResponse flush();
+
+
+ /**
+ * <p>
+ * Returns the creation time when Publisher instance was created.
+ * <p>
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getPublisherCreationTime();
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java
new file mode 100644
index 0000000..b60c74c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java
@@ -0,0 +1,48 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Publisher instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRPublisherFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher
+ *
+ * @param publisherConfig publisher config
+ *
+ * @return DMaaP MR Publisher instance
+ */
+ DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig);
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java
new file mode 100644
index 0000000..5930f41
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java
@@ -0,0 +1,220 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import static java.lang.String.format;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+
+import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+
+/**
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+@ComponentScan
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
+
+ @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
+ private int publisherMaxFlushRetries;
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
+
+ public int a =2;
+ private final DMaaPMRPublisherConfig publisherConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final DMaaPMRPublisherQueue publisherQueue;
+ private final Date publisherCreationTime;
+ private URI publisherUri;
+
+ @Inject
+ public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
+ DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
+ CloseableHttpClient closeableHttpClient){
+
+ this.publisherConfig = publisherConfig;
+ final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
+ this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
+ maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
+ this.closeableHttpClient = closeableHttpClient;
+ this.publisherUri = createPublisherURI(publisherConfig);
+ this.publisherCreationTime = new Date();
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse publish(List<String> messages) {
+
+ final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
+
+ // if messages size is less than batch queue size - just queue them for batch publishing
+ if (batchQueueRemainingSize > messages.size()) {
+ LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
+ messages.size(), batchQueueRemainingSize);
+ final int batchQueueSize = publisherQueue.addBatchMessages(messages);
+ return createPublisherAcceptedResponse(batchQueueSize);
+
+ } else {
+
+ // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
+ "Publisher Topic.");
+ return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
+ }
+
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
+
+ LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
+
+ final String contentType = publisherConfig.getContentType();
+ final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName();
+ final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword();
+ final HttpPost postRequest = new HttpPost(publisherUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
+ }
+
+ // Create post string entity
+ final String messagesJson = convertToJsonString(messages);
+ final StringEntity requestEntity =
+ new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
+ postRequest.setEntity(requestEntity);
+
+ try {
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+ // if messages were published successfully, return successful response
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
+ "Body: {}, Number of Messages published: {}",
+ responseCode, responseBody, messages.size());
+
+ } else {
+ LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
+ "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ }
+
+ return createPublisherResponse(responseCode, responseBody,
+ getPendingMessages(publisherQueue, publisherConfig));
+
+ } catch (IOException e) {
+ // If IO Error then we need to also put messages in recovery queue
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
+ "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
+
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse flush() {
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ // If there are no message return 204 (No Content) response code
+ if (queueMessages.isEmpty()) {
+ LOG.debug("No messages to publish to batch queue. Returning 204 status code");
+ return createPublisherNoContentResponse();
+ } else {
+ // force publish messages in queue
+ return forcePublish(queueMessages);
+ }
+ }
+
+ @Override
+ public Date getPublisherCreationTime() {
+ return new Date(publisherCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ // flush current message in the queue
+ int retrialNumber = 0;
+ int flushResponseCode;
+
+ // automatic retries if messages cannot be flushed
+ do {
+ retrialNumber++;
+ DMaaPMRPublisherResponse flushResponse = flush();
+ flushResponseCode = flushResponse.getResponseCode();
+
+ if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
+ "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
+ publisherMaxFlushRetries);
+
+ Thread.sleep(publisherMaxFlushRetries);
+ }
+ } while (retrialNumber <= publisherMaxFlushRetries &&
+ !HTTPUtils.isSuccessfulResponseCode(flushResponseCode));
+
+ if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
+ } else {
+ LOG.info("Successfully published all batched messages to publisher.");
+ }
+
+ // close http client
+ closeableHttpClient.close();
+
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java
new file mode 100644
index 0000000..18feec8
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java
@@ -0,0 +1,87 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic
+ * is offline for some reason. It does so by having a recovery queue which keeps
+ * messages in order in case there is temporary interruption in DMaaP Publisher
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueue {
+
+ /**
+ * <p>
+ * Add batchMessages to Batch Queue
+ * </p>
+ *
+ * @param batchMessages messages that needs to be added to batch queue
+ * @return current size of batch queue. Throws {@link IllegalStateException}
+ * if batch queue does not have enough space
+ */
+ int addBatchMessages(List<String> batchMessages);
+
+
+ /**
+ * <p>
+ * Add recoverable messages to Recoverable Queue
+ * </p>
+ *
+ * @param recoverableMessages messages that needs to be added to recoverable queue
+ * @return current size of the recoverable queue. Throws {@link IllegalStateException}
+ * if recoverable queue does not have enough space
+ */
+ int addRecoverableMessages(List<String> recoverableMessages);
+
+ /**
+ * <p>
+ * Get messages that need to be published to DMaaP topic. Messages in recoverable
+ * queue are appended if present.
+ * </p>
+ *
+ * @return List of messages from both batch and recovery queue
+ */
+ List<String> getMessageForPublishing();
+
+ /**
+ * <p>
+ * Remaining capacity of Batch Queue
+ * </p>
+ *
+ * @return Remaining Batch Queue Size
+ */
+ int getBatchQueueRemainingSize();
+
+ /**
+ * <p>
+ * Remaining capacity of Recovery Queue
+ * </p>
+ *
+ * @return Remaining Recovery Queue Size
+ */
+ int getRecoveryQueueRemainingSize();
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java
new file mode 100644
index 0000000..f100210
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java
@@ -0,0 +1,45 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueueFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher Queue
+ *
+ * @param batchQueueSize batch queue size
+ * @param recoveryQueueSize recovery queue size
+ *
+ * @return instance of DMaaP MR Publisher Queue
+ */
+ DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize);
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java
new file mode 100644
index 0000000..fc240d6
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java
@@ -0,0 +1,126 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newLinkedList;
+import static java.util.Collections.unmodifiableList;
+
+/**
+ * <p>
+ * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
+ * for batch and recovery queues
+ * </p>
+ *
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
+
+ private final LinkedBlockingDeque<String> batchQueue;
+ private final LinkedBlockingDeque<String> recoveryQueue;
+
+ @Inject
+ public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize) {
+ batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
+ recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
+ LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
+ batchQueueSize, recoveryQueueSize);
+ }
+
+ @Override
+ public synchronized int addBatchMessages(List<String> batchMessages) {
+
+ // checks if batchMessages size does not exceed batch queue capacity
+ if (batchMessages.size() > batchQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue");
+ }
+
+ // Add batchMessages to batch queue
+ for (String message : batchMessages) {
+ batchQueue.add(message);
+ }
+
+ // returns current elements size in batch queue
+ return batchQueue.size();
+ }
+
+ @Override
+ public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
+
+ // checks if messages size does not exceed recovery queue size
+ if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
+ }
+
+ // add messages to recovery queue
+ for (String recoverableMessage : recoverableMessages) {
+ recoveryQueue.add(recoverableMessage);
+ }
+
+ // returns current size of recovery queue
+ return recoveryQueue.size();
+ }
+
+ @Override
+ public synchronized List<String> getMessageForPublishing() {
+
+ final List<String> recoveryMessageList = new LinkedList<>();
+ final List<String> batchMessagesList = new LinkedList<>();
+
+ // get messages from recovery queue if present
+ if (!recoveryQueue.isEmpty()) {
+ final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
+ LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
+ }
+
+ // get messages from batch queue if present
+ if (!batchQueue.isEmpty()) {
+ final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
+ LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
+ }
+
+ // concat recovery and batch queue elements
+ return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
+ }
+
+ @Override
+ public synchronized int getBatchQueueRemainingSize() {
+ return batchQueue.remainingCapacity();
+ }
+
+ @Override
+ public synchronized int getRecoveryQueueRemainingSize() {
+ return recoveryQueue.remainingCapacity();
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java
new file mode 100644
index 0000000..ee12cc8
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java
@@ -0,0 +1,50 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+/**
+ * <p>
+ * Contract for all DMaaPMR Publisher Response
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisherResponse {
+
+ /**
+ * Gets HTTP Response Code
+ *
+ * @return HTTP Response code as String
+ */
+ Integer getResponseCode();
+
+ /**
+ * Gets Response Message
+ *
+ * @return Response Message
+ */
+ String getResponseMessage();
+ /**
+ * Gets number of pending messages
+ *
+ * @return pending messages in the batch queue
+ */
+ int getPendingMessagesCount();
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java
new file mode 100644
index 0000000..4570f78
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java
@@ -0,0 +1,60 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRPublisher;
+
+import javax.annotation.Nonnull;
+
+/**
+ * <p>
+ * An simple implementation of {@link DMaaPMRPublisherResponse}
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRPublisherResponseImpl implements DMaaPMRPublisherResponse {
+
+ private final Integer responseCode;
+ private final String responseMessage;
+ private final int pendingMessagesCount;
+
+ public DMaaPMRPublisherResponseImpl(@Nonnull Integer responseCode, @Nonnull String responseMessage,
+ int pendingMessagesCount) {
+ this.responseCode = responseCode;
+ this.responseMessage = responseMessage;
+ this.pendingMessagesCount = pendingMessagesCount;
+ }
+
+ @Override
+ public Integer getResponseCode() {
+ return responseCode;
+ }
+
+ @Override
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ @Override
+ public int getPendingMessagesCount() {
+ return pendingMessagesCount;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java
new file mode 100644
index 0000000..336e3dd
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java
@@ -0,0 +1,56 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+
+
+import java.util.Date;
+
+/**
+ * <p>
+ * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRSubscriber extends AutoCloseable {
+
+ /**
+ * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters
+ * for messageLimit and message timeout are used
+ *
+ * @return DMaaP Message Router Subscriber Response
+ */
+ DMaaPMRSubscriberResponse fetchMessages();
+
+
+ /**
+ * Returns the Subscriber instance creation time
+ * <p>
+ * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which
+ * are published to the topic after the creation of the Subscriber.
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getSubscriberCreationTime();
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java
new file mode 100644
index 0000000..dcb6f4c
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java
@@ -0,0 +1,46 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+
+/**
+ * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Subscriber instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRSubscriberFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Subscriber Instance
+ *
+ * @param subscriberConfig subscriber config
+ *
+ * @return DMaaP MR Subscriber instance
+ */
+ DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig);
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java
new file mode 100644
index 0000000..2e7aac9
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java
@@ -0,0 +1,128 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.utils.HTTPUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRSubscriber} which uses
+ * {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
+
+ private final DMaaPMRSubscriberConfig subscriberConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final URI subscriberUri;
+ private final Date subscriberCreationTime;
+
+ @Inject
+ public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
+ CloseableHttpClient closeableHttpClient) {
+ this.subscriberConfig = subscriberConfig;
+ this.closeableHttpClient = closeableHttpClient;
+ this.subscriberUri = createSubscriberURI(subscriberConfig);
+ this.subscriberCreationTime = new Date();
+ }
+
+ @Override
+ public DMaaPMRSubscriberResponse fetchMessages() {
+
+ final String userName = (subscriberConfig.getUserName().equals("null")) ? null : subscriberConfig.getUserName();
+ final String userPassword = (subscriberConfig.getUserPassword().equals("null")) ? null
+ : subscriberConfig.getUserPassword();
+ final HttpGet getRequest = new HttpGet(subscriberUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
+ }
+
+ try {
+
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+
+ List<String> fetchedMessages = new LinkedList<>();
+ String responseMessage = responseBody;
+
+ // if messages were published successfully, return successful response
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ if (responseBody != null) {
+ fetchedMessages = convertJsonToStringMessages(responseBody);
+ responseMessage = "Messages Fetched Successfully";
+ } else {
+ responseMessage = "DMaaP Response Body had no messages";
+ }
+ } else {
+ LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, "
+ + "DMaaP Response Body: {}", responseCode, responseBody);
+ }
+
+ return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
+
+ } catch (IOException e) {
+
+ final String errorMessage = format("IO Exception while fetching messages from DMaaP Topic. Exception %s",
+ e);
+ throw new DMaapException(errorMessage, LOG, e);
+ }
+
+ }
+
+ @Override
+ public Date getSubscriberCreationTime() {
+ return new Date(subscriberCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeableHttpClient.close();
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java
new file mode 100644
index 0000000..98b6b01
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java
@@ -0,0 +1,53 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import java.util.List;
+
+/**
+ * <p>
+ * Contract for all DMaaP MR Subscriber Responses
+ * </p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRSubscriberResponse {
+
+ /**
+ * Gets HTTP Response Code
+ *
+ * @return HTTP Response code as String
+ */
+ Integer getResponseCode();
+
+ /**
+ * Gets Response Message
+ *
+ * @return Response Message
+ */
+ String getResponseMessage();
+ /**
+ * Returns message fetched from DMaaP MR Topic
+ *
+ * @return collection of actual message retrieved from DMaaP MR Topic
+ */
+ List<String> getFetchedMessages();
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java
new file mode 100644
index 0000000..e318359
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java
@@ -0,0 +1,79 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.dmaap.MRSubcriber;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * <p>
+ * A simple implementation for {@link DMaaPMRSubscriberResponse}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberResponseImpl implements DMaaPMRSubscriberResponse {
+
+ private final Integer responseCode;
+ private final String responseMessage;
+ private final List<String> fetchedMessages;
+
+ public DMaaPMRSubscriberResponseImpl(@Nonnull Integer responseCode,
+ @Nonnull String responseMessage,
+ @Nullable List<String> fetchedMessages) {
+ this.responseCode = responseCode;
+ this.responseMessage = responseMessage;
+ this.fetchedMessages = fetchedMessages != null ? fetchedMessages : ImmutableList.<String>of();
+ }
+
+ public DMaaPMRSubscriberResponseImpl(Integer responseCode, String responseMessage) {
+ this(responseCode, responseMessage, null);
+ }
+
+ @Override
+ public Integer getResponseCode() {
+ return responseCode;
+ }
+
+ @Override
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ @Override
+ public List<String> getFetchedMessages() {
+ return unmodifiableList(fetchedMessages);
+ }
+
+ @Override
+ public String toString()
+ {
+
+ return responseMessage;
+
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java
deleted file mode 100644
index 796fe70..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.domain;
-
-import org.springframework.data.annotation.Id;
-
-/**
- * A domain wrapper class for saving the config file in Mongo DB
- *
- * @author kmalbari
- *
- */
-public class ConfigFileData {
-
-
- @Id private String id;
-
- private String xmlFileName;
-
- private String xmlContent;
-
- public String getXmlFileName() {
- return xmlFileName;
- }
-
- public void setXmlFileName(String xmlFileName) {
- this.xmlFileName = xmlFileName;
- }
-
- public String getXmlContent() {
- return xmlContent;
- }
-
- public void setXmlContent(String xmlContent) {
- this.xmlContent = xmlContent;
- }
-
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java
deleted file mode 100644
index 625b021..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.exception;
-
-/**
- * Exception when unable to connect to Config file Disk repository
- *
- * @author kmalbari
- *
- */
-public class ConfigFileReadException extends VesException {
-
- /**
- *
- */
- private static final long serialVersionUID = 414953072485703000L;
-
- public ConfigFileReadException(String exceptionMessage) {
- super(exceptionMessage);
- }
-
- public ConfigFileReadException(String exceptionMessage, Exception exception) {
- super(exceptionMessage, exception);
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
index 7055bc0..1f642b2 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java
@@ -28,9 +28,6 @@ package org.onap.universalvesadapter.exception;
*/
public class ConfigFileSmooksConversionException extends VesException {
- /**
- *
- */
private static final long serialVersionUID = 7128340575013771888L;
public ConfigFileSmooksConversionException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
index 7a35f83..3b30fdd 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java
@@ -19,17 +19,18 @@
*/
package org.onap.universalvesadapter.exception;
+import org.slf4j.Logger;
+
+
/**
* Exception generated when dealing with communication to DMaap MR API
*
* @author kmalbari
*
*/
-public class DMaapException extends VesException {
+public class DMaapException extends RuntimeException {
- /**
- *
- */
+
private static final long serialVersionUID = 7045766597511192878L;
public DMaapException(String string) {
@@ -39,6 +40,26 @@ public class DMaapException extends VesException {
public DMaapException(String string, Exception exception) {
super(string, exception);
}
+
+ /**
+ * @param message - Error Message for Exception
+ * @param cause - Actual Exception which caused {@link DMaapException}
+ */
+ public DMaapException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Creates and logs the DCAE Runtime Exception to given logger
+ *
+ * @param message - Error Message for Exception and logging
+ * @param logger - Logger used for logging exception
+ * @param cause - Actual exception which caused {@link DMaapException}
+ */
+ public DMaapException(String message, Logger logger, Throwable cause) {
+ super(message, cause);
+ logger.error(message);
+ }
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
index 3dfa034..b4ebcc5 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java
@@ -27,9 +27,6 @@ package org.onap.universalvesadapter.exception;
*/
public class MapperConfigException extends VesException {
- /**
- *
- */
private static final long serialVersionUID = -7876042513908918292L;
public MapperConfigException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
index fd11b89..12c74cb 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java
@@ -28,10 +28,6 @@ package org.onap.universalvesadapter.exception;
*/
public class VesException extends Exception {
-
- /**
- *
- */
private static final long serialVersionUID = -8549819066568432382L;
public VesException(String string) {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
index e34b98a..06d8249 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java
@@ -130,24 +130,7 @@ public class Evaluation {
public String toString() {
return new ToStringBuilder(this).append("operand", operand).append("field", field).append("value", value).append("datatype", datatype).append("lhs", lhs).append("rhs", rhs).append("additionalProperties", additionalProperties).toString();
}
- /*
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(field).append(additionalProperties).append(value).append(rhs).append(datatype).append(operand).append(lhs).toHashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if ((other instanceof Evaluation) == false) {
- return false;
- }
- Evaluation rhs = ((Evaluation) other);
- return new EqualsBuilder().append(field, rhs.field).append(additionalProperties, rhs.additionalProperties).append(value, rhs.value).append(rhs, rhs.rhs).append(datatype, rhs.datatype).append(operand, rhs.operand).append(lhs, rhs.lhs).isEquals();
- }
-*/
+
@Override
public int hashCode() {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
index 1e6006a..43766ef 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java
@@ -35,11 +35,6 @@ import org.springframework.stereotype.Component;
@Component
public class AdapterService {
- /*
- * @Autowired private UniversalEventAdapter snmpTrapEventAdapter; public
- * GenericAdapter identifyIncomingJsonFormatAndReturnAdapter() { return
- * snmpTrapEventAdapter; }
- */
/**
* Identifies eventype by parsing the incoming json file.
@@ -52,7 +47,6 @@ public class AdapterService {
*/
public String identifyEventTypeFromIncomingJson(String incomingJsonString) throws MapperConfigException {
- // TODO A proper logic to identify diffeent events is needed here
return MapperConfigUtils.checkIncomingJsonForMatchingDomain(incomingJsonString);
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java
deleted file mode 100644
index bf45a1b..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-
-/**
- * A contract defined for services that will handle the operations of config file.
- *
- * @author kmalbari
- *
- */
-public interface ConfigFileService {
-
- /**
- * Returns the config file data.
- *
- * @param fileName file name
- * @return config file content
- * @throws ConfigFileReadException if unable to read config file
- */
- String readConfigFile(String fileName) throws ConfigFileReadException;
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
index e463a28..04f333e 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
@@ -19,226 +19,96 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration;
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.exception.VesException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- *
- * This service will handle all the communication with the DMaap MR API
- *
- *
- * @author kmalbari
- *
- */
@Component
public class DMaapService {
- private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private DMaapMrUrlConfiguration dmaapMrUrlObject;
-
- private MRConsumer consumer;
+ private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
+ private static List<String> list = new LinkedList<String>();
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+
+ /**
+ * It fetches events from DMaap in JSON, transforms JSON to VES format and
+ * publishes it to outgoing DMaap MR Topic
+ *
+ * @param DMaaPMRSubscriber,DMaaPMRPublisher
+ * @return
+ */
+ public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater)
+ throws InterruptedException {
+ LOGGER.info("fetch and publish from and to Dmaap started");
+
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) {
+ list.add(incomingJsonString);
+
+ }
+
+ if (list.size() == 0) {
+ Thread.sleep(2000);
+ }
+ LOGGER.debug("number of messages to be converted :{}", list.size());
+
+ if (list.size() != 0) {
+ String val = ((LinkedList<String>) list).removeFirst();
+ List<String> messages = new ArrayList<>();
+ String vesEvent = processReceivedJson(val);
+ if (!(vesEvent.isEmpty() || vesEvent.equals(null) || vesEvent.equals(""))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+ LOGGER.info("Message successfully published to DMaaP Topic");
+ }
+
+ }
+
+ }
+ }
+ }
+
+ /**
+ * It finds mapping file for received json, transforms json to VES format
+ *
+ * @param incomingJsonString
+ * @return
+ */
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+ /*
+ * For Future events String eventType =
+ * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
+ *
+ * LOGGER.debug("Event identified as " + eventType);
+ */
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp");
+
+ } catch (VesException exception) {
+ LOGGER.error("Received exception : " + exception.getMessage(), exception);
+ LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ LOGGER.error("Received exception : ", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
- private MRBatchingPublisher publisher;
-
- private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>();
-
- /**
- * Adds message to outgoing queue that will be sent to DMaaP topic.
- *
- * @param message outbound message in VES format
- */
- public void addMessageInOutgoingQueue(String message) {
- if (null != message && !"".equals(message)) {
- outgoingMessageQueue.add(message);
- eLOGGER.debug("Added message to outgoing queue " + message);
- }
- }
-
-
-
- /**
- * reads the messages on DMaap MR Topic
- *
- * @return iterable of messages that will be received on DMaap MR Topic
- *
- * @throws DMaapException
- */
- public Iterable<String> consumeFromDMaap() throws DMaapException{
- if(null == consumer){
- try {
- consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties());
- eLOGGER.debug("Created consumer");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception);
- }
- }
-
- try {
- eLOGGER.debug("Returning result fetched by consumer");
- return consumer.fetch();
- } catch (Exception exception) {
- throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception);
- }
-// return () -> Collections.emptyIterator();
-
- }
-
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap() throws DMaapException{
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Create a publisher now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- for(String message : outgoingMessageQueue){
- try {
- publisher.send(message);
- eLOGGER.debug("Sending message to DMaaP :-> " + message );
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- }
- else
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
-
- }
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap(String outgoingMessage) throws DMaapException{
- if(null == publisher){
- synchronized(publisher){
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Publisher created now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- }
- }
- try {
- publisher.send(outgoingMessage);
- eLOGGER.debug("Sent outgoing message " + outgoingMessage);
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- } else{
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
- }
-
- }
-
-
- /**
- * for local testing only
- * @return
- * @throws DMaapException
- */
- /*public String consume() throws DMaapException {
-
- URL url;
- StringBuffer incomingJson = null;
- incomingJson = new StringBuffer();
- try {
- url = new URL(dmaapMrUrlObject.getUrl());
-
- //open the connection to the above URL.
- URLConnection urlcon = url.openConnection();
-
- Map<String, List<String>> header = urlcon.getHeaderFields();
-
- //print all the fields along with their value.
- for (Map.Entry<String, List<String>> mp : header.entrySet()) {
- eLOGGER.debug(mp.getKey() + " : ");
- eLOGGER.debug(mp.getValue().toString());
- }
- eLOGGER.debug("Complete source code of the URL is-");
- eLOGGER.debug("---------------------------------");
-
- //get the inputstream of the open connection.
- BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream()));
- String tempString;
- //print the source code line by line.
- while ((tempString = br.readLine()) != null) {
- eLOGGER.debug(tempString);
- incomingJson.append(tempString);
- }
-
- } catch (MalformedURLException exception) {
- throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception);
- } catch (IOException exception) {
- throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception);
- }
- return incomingJson.toString();
- }*/
-
-
-
-
-
-
-
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java
deleted file mode 100644
index 7c05ced..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.onap.universalvesadapter.configs.DiskRepoConfiguration;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * Implementation of {@code ConfigFileService} using disk repository.
- *
- * @author kmalbari
- *
- */
-@Component
-public class DiskRepoConfigFileService implements ConfigFileService {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private DiskRepoConfiguration diskRepoConfiguration;
-
- private RestTemplate restTemplate;
-
- private URI uri = null;
-
- /* (non-Javadoc)
- * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String)
- */
- @Override
- public String readConfigFile(String fileName) throws ConfigFileReadException {
- logger.debug("Reading config file for " + fileName);
- if (null == uri) {
- try {
- uri = new URI(diskRepoConfiguration.getFileRepositoryUrl() + fileName);
- logger.debug("Read URI for " + fileName);
- } catch (URISyntaxException exception) {
- throw new ConfigFileReadException("Unable to read config file for file "
- + fileName + "\n Reason : " + exception.getMessage(), exception);
- }
- }
- logger.debug("Calling file repo service for URI" + uri);
- ResponseEntity<String> fileDataEntity = getRestTemplate().getForEntity(uri, String.class);
- logger.debug("Call completed successfully");
- return fileDataEntity.getBody();
- }
-
- /**
- * Instantiates the instance if null and returns it.
- *
- * @return {@code RestTemplate} instance
- */
- private RestTemplate getRestTemplate(){
-
- if (null == restTemplate) {
- restTemplate = new RestTemplate();
- }
-
- return restTemplate;
- }
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java
deleted file mode 100644
index 77769f5..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import org.onap.universalvesadapter.domain.ConfigFileData;
-import org.springframework.stereotype.Component;
-
-/**
- * Service to use mongo db as config file repository
- *
- * @author kmalbari
- *
- */
-@Component
-public class MongoDbConfigFileService implements ConfigFileService {
-
- /* (non-Javadoc)
- * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String)
- */
- public String readConfigFile(String configFileName){
- //HERE CONFIG FILE DATA WOULD COME FROM MONGO DB
- ConfigFileData configFileData = new ConfigFileData();
- configFileData.setXmlFileName(configFileName);
- configFileData.setXmlContent("<?xml version=\"1.0\" encoding=\"UTF-8\"?> "
- + "<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" "
- + "xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" "
- + " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> "
- + " <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> "
- + " </json:reader> "
- + "</smooks-resource-list>");
- return configFileData.getXmlContent();
- }
-
-
-}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
new file mode 100644
index 0000000..f92511e
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.service;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Component;
+
+//AdapterInitializer
+@Component
+public class VESAdapterInitializer implements CommandLineRunner, Ordered {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VESAdapterInitializer.class);
+ @Value("${spring.datasource.url}")
+ String dBurl;
+ @Value("${spring.datasource.username}")
+ String user;
+ @Value("${spring.datasource.password}")
+ String pwd;
+
+ private static Map<String, String> mappingFiles = new HashMap<String, String>();
+ private static Map<String, String> env;
+ private static String url;
+ public static String retString;
+ public static String retCBSString;
+
+ @Override
+ public void run(String... args) throws Exception {
+
+ fetchMappingFile();
+ getconsul();
+
+ }
+
+ private void getconsul() {
+
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ LOGGER.info(entry.getKey() + ":" + entry.getValue());
+ }
+
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+ LOGGER.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ retString = executecurl(url);
+
+ } else {
+
+
+
+ LOGGER.info(">>>Static configuration to be used");
+
+
+ }
+
+ }
+
+ private String executecurl(String url) {
+
+ String[] command = { "curl", "-v", url };
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ LOGGER.info(result);
+
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ LOGGER.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
+
+ }
+
+ public void fetchMappingFile() {
+ try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {
+ LOGGER.info("Retrieving data from DB");
+ PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file");
+ ResultSet rs = pstmt.executeQuery();
+ // parsing the column each time is a linear search
+ int column1Pos = rs.findColumn("enterpriseid");
+ int column2Pos = rs.findColumn("mappingfilecontents");
+ String hexString;
+ while (rs.next()) {
+ String column1 = rs.getString(column1Pos);
+ String column2 = rs.getString(column2Pos);
+ hexString = column2.substring(2);
+ byte[] bytes = Hex.decodeHex(hexString.toCharArray());
+ String data = new String(bytes, "UTF-8");
+ mappingFiles.put(column1, data);
+ }
+ LOGGER.info("DB Initialization Completed..." + mappingFiles.size());
+ } catch (Exception e) {
+ LOGGER.error("Error occured due to :" + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ public static Map<String, String> getMappingFiles() {
+ return mappingFiles;
+ }
+
+ public static void setMappingFiles(Map<String, String> mappingFiles) {
+ VESAdapterInitializer.mappingFiles = mappingFiles;
+ }
+
+ @Override
+ public int getOrder() {
+ return 1;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 112d1d6..ed590a8 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,28 +19,14 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-
-import javax.annotation.Resource;
-
-import org.milyn.io.FileUtils;
-import org.onap.universalvesadapter.adapter.GenericAdapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
-import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-import org.onap.universalvesadapter.utils.ParallelTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
-import org.springframework.util.FileCopyUtils;
/**
* Service that starts the universal ves adapter module to listen for events
@@ -50,128 +36,51 @@ import org.springframework.util.FileCopyUtils;
*/
@Component
public class VesService {
-
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- private boolean isRunning = true;
-
- @Autowired
- private ConfigurableApplicationContext ctx;
-
- @Autowired
- private DMaapService dmaapService;
-
- @Autowired
- private AdapterService adapterService;
-
- @Resource(name = "universalEventAdapter")
- private GenericAdapter eventAdapter;
- @Value("${messagesInBatch}")
- private int messagesInBatch;
+ private final Logger LOGGER = LoggerFactory.getLogger(VesService.class);
+
+ private boolean isRunning = true;
+
+ @Autowired
+ private DMaapService dmaapService;
+
+ @Autowired
+ private Creator creator;
+
- @Value("${messagesInTimeInterval}")
- private long messagesInTimeInverval;
-
- @Value("${mapperConfig.file}")
- private String mapperConfigFile;
-
- /*public void start(){
-
- String incomingJsonString = dmaapService.consume();
- if(!"".equals(incomingJsonString)){
- GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter();
- String outgoingJsonString = eventAdapter.transform(incomingJsonString);
- System.out.println(outgoingJsonString);
- }
- }*/
-
-
- /**
- * method triggers universal ves adapter module.
- */
- public void start() {
-
- try {
- String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile));
- MapperConfigUtils.readMapperConfigFile(mappingConfigFileData);
-
-
- ParallelTasks parallelTasks = new ParallelTasks();
- while (isRunning) {
- int processingNumberOfMessage = 0;
- long start = System.currentTimeMillis();
- for (String incomingJsonString : dmaapService.consumeFromDMaap()) {
- parallelTasks.add(() -> processReceivedJson(incomingJsonString));
- processingNumberOfMessage++;
- if (processingNumberOfMessage == messagesInBatch
- || (System.currentTimeMillis() - start) > messagesInTimeInverval) {
- processingNumberOfMessage = 0;
- start = System.currentTimeMillis();
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
- }
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
-
- /*String incomingJsonString = "";
- incomingJsonString = dmaapService.consume();
- processReceivedJson(incomingJsonString);*/
- } catch (Exception exception) {
- LOGGER.error("Reported exception : " + exception.getMessage(), exception);
- }
- }
+ /**
+ * method triggers universal VES adapter module.
+ */
+ public void start() throws MapperConfigException {
+ LOGGER.debug("Creating Subcriber and Publisher with creator.............");
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber();
- /**
- * It finds mapping file for received json, transforms json to VES format
- * and publishes it to outgoing DMaap MR Topic
- *
- * @param incomingJsonString
- */
- private void processReceivedJson(String incomingJsonString){
- LOGGER.debug("Received incoming message : " + incomingJsonString);
- if (!"".equals(incomingJsonString)) {
- String eventType;
- try {
- eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
-
- LOGGER.debug("Event identified as " + eventType);
- String outgoingJsonString;
- outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType);
- LOGGER.debug("Output VES json to be sent " + outgoingJsonString);
-
-// dmaapService.addMessageInOutgoingQueue(outgoingJsonString);
-// LOGGER.debug("Added message in outgoing Queue ");
-
- dmaapService.publishToDMaap(outgoingJsonString);
- LOGGER.debug("Sent message in outgoing Queue ");
-
-
- } catch (VesException exception) {
- LOGGER.error("Received exception : " + exception.getMessage(), exception);
-
- //TODO KKM : Do we wish to continue the application with same exception in every thread??
- LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
- ctx.close();
- }
- }
- }
-
- /**
- * method stops universal ves adapter module
- */
- public void stop() {
-
- isRunning = false;
- }
+ DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
+
+ // Create subscriber & publisher thread
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOGGER.debug("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
+ dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Start subscriber & publisher thread
+ t1.setName("SNMP-COLLECTOR");
+ t1.start();
+
+ }
+
+ /**
+ * method stops universal ves adapter module
+ */
+ public void stop() {
+ isRunning = false;
+ }
}
+
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
new file mode 100644
index 0000000..34bbc8e
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
@@ -0,0 +1,288 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+package org.onap.universalvesadapter.utils;
+
+import javax.validation.constraints.NotEmpty;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.stereotype.Component;
+
+@Component
+@PropertySource(value = {"classpath:application.properties","classpath:DMaapMR.properties"})
+@ConfigurationProperties
+public class DmaapConfig {
+
+ @Value("${mr.hostname}")
+ @NotEmpty
+ private String hostname;
+
+ // default port number
+ @Value("${mr.DEFAULT_PORT_NUMBER}")
+ @NotEmpty
+ private int DEFAULT_PORT_NUMBER;
+
+ // default to no username
+ @Value("${mr.DEFAULT_USER_NAME}")
+ private String DEFAULT_USER_NAME;
+
+
+ //defaults to no userPassword
+ @Value("${mr.DEFAULT_USER_PASSWORD}")
+ private String DEFAULT_USER_PASSWORD;
+
+ //defaults to using https protocol
+ @Value("${mr.DEFAULT_PROTOCOL}")
+ @NotEmpty
+ private String DEFAULT_PROTOCOL;
+
+ //defaults to json content type
+ @Value("${mr.DEFAULT_CONTENT_TYPE}")
+ @NotEmpty
+ private String DEFAULT_CONTENT_TYPE;
+
+ @Value("${mr.DMAAP_URI_PATH_PREFIX}")
+ @NotEmpty
+ private String DMAAP_URI_PATH_PREFIX;
+
+ @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}")
+ @NotEmpty
+ private String DMAAP_DEFAULT_CONSUMER_ID;
+
+ @Value("${mr.DMAAP_GROUP_PREFIX}")
+ @NotEmpty
+ private String DMAAP_GROUP_PREFIX;
+
+ // Publisher Constants
+
+ //Dmaap Publisher Topic
+ @Value("${mr.publisher.topic}")
+ @NotEmpty
+ private String publisherTopic;
+
+ //disable batching by default
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+
+ //default recovery messages size
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+
+//number of retries when flushing messages
+ @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+
+ //delay in retrying for flushing messages
+ @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+
+ // Subscriber Constants
+
+ //Dmaap Subcriber Topic
+ @Value("${mr.subscriber.topic}")
+ @NotEmpty
+ private String subscriberTopic;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
+ @NotEmpty
+ private int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
+ @NotEmpty
+ private int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+
+ @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
+ @NotEmpty
+ private String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+
+ @Value("${mr.subcriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+
+ @Value("${mr.subcriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getDEFAULT_PORT_NUMBER() {
+ return DEFAULT_PORT_NUMBER;
+ }
+
+ public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) {
+ DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER;
+ }
+
+ public String getDEFAULT_USER_NAME() {
+ return DEFAULT_USER_NAME;
+ }
+
+ public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) {
+ DEFAULT_USER_NAME = dEFAULT_USER_NAME;
+ }
+
+ public String getDEFAULT_USER_PASSWORD() {
+ return DEFAULT_USER_PASSWORD;
+ }
+
+ public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) {
+ DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD;
+ }
+
+ public String getDEFAULT_PROTOCOL() {
+ return DEFAULT_PROTOCOL;
+ }
+
+ public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) {
+ DEFAULT_PROTOCOL = dEFAULT_PROTOCOL;
+ }
+
+ public String getDEFAULT_CONTENT_TYPE() {
+ return DEFAULT_CONTENT_TYPE;
+ }
+
+ public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) {
+ DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE;
+ }
+
+ public String getDMAAP_URI_PATH_PREFIX() {
+ return DMAAP_URI_PATH_PREFIX;
+ }
+
+ public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) {
+ DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX;
+ }
+
+ public String getDMAAP_DEFAULT_CONSUMER_ID() {
+ return DMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) {
+ DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public String getDMAAP_GROUP_PREFIX() {
+ return DMAAP_GROUP_PREFIX;
+ }
+
+ public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) {
+ DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX;
+ }
+
+ public String getPublisherTopic() {
+ return publisherTopic;
+ }
+
+ public void setPublisherTopic(String publisherTopic) {
+ this.publisherTopic = publisherTopic;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(
+ int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public String getSubscriberTopic() {
+ return subscriberTopic;
+ }
+
+ public void setSubscriberTopic(String subscriberTopic) {
+ this.subscriberTopic = subscriberTopic;
+ }
+
+ public int getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
+ return subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
+ this.subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public int getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
+ return subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
+ this.subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public String getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
+ return subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public void setSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
+ this.subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public String getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
+ return subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public void setSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
+ this.subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public String getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
+ return subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+ public void setSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
+ this.subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java
new file mode 100644
index 0000000..02632f4
--- /dev/null
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java
@@ -0,0 +1,62 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.utils;
+
+/**
+ * Contains common utils to check HTTP Related Utils
+ *
+ * @author Rajiv Singla . Creation Date: 11/2/2016.
+ */
+public abstract class HTTPUtils {
+
+ /**
+ * HTTP Status code for successful HTTP call
+ */
+ public static final Integer HTTP_SUCCESS_STATUS_CODE = 200;
+
+ /**
+ * HTTP Response code when request has been accepted for processing, but the processing has not been completed
+ */
+ public static final Integer HTTP_ACCEPTED_RESPONSE_CODE = 202;
+
+ /**
+ * HTTP Response code when there is no content
+ */
+ public static final Integer HTTP_NO_CONTENT_RESPONSE_CODE = 204;
+
+
+ public static final String JSON_APPLICATION_TYPE = "application/json";
+
+
+ private HTTPUtils() {
+
+ }
+
+ /**
+ * Checks if HTTP Status code is less than or equal to 200 but less then 300
+ *
+ * @param statusCode http status code
+ * @return true if response code between 200 and 300
+ */
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
index 4e67ed6..3d1907a 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java
@@ -19,18 +19,22 @@
*/
package org.onap.universalvesadapter.utils;
-import com.att.aft.dme2.internal.apache.commons.lang3.EnumUtils;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
+
import org.onap.universalvesadapter.exception.MapperConfigException;
import org.onap.universalvesadapter.mappingconfig.Entry;
import org.onap.universalvesadapter.mappingconfig.Evaluation;
import org.onap.universalvesadapter.mappingconfig.MapperConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
/**
+ * This class will be used in Future for different telemetry data:
* This class will be utility class to read the mapper config file and parse the
* config to prepare the grammar to detect the incoming json's event type.
*
@@ -39,6 +43,7 @@ import org.onap.universalvesadapter.mappingconfig.MapperConfig;
*/
public class MapperConfigUtils {
+ private final Logger LOGGER = LoggerFactory.getLogger(MapperConfigUtils.class);
private static Set<Entry> entries = new TreeSet<>((o1, o2) -> o1.getPriority().compareTo(o2.getPriority()));
private enum JoinOperator {
@@ -212,6 +217,7 @@ public class MapperConfigUtils {
exception);
}
System.out.println("Read config file content into :" + config);
+
if (null != config) {
entries.addAll(config.getEntries());
} else {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java
deleted file mode 100644
index 45fdf96..0000000
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- *
- * Utility class to execute parallel tasks
- *
- * @author kmalbari
- *
- */
-public class ParallelTasks
-{
- private final Collection<Runnable> tasks = new ArrayList<Runnable>();
-
- public ParallelTasks()
- {
- }
-
- /**
- *
- * Add task to be executed in parallel
- *
- * @param task
- */
- public void add(final Runnable task)
- {
- tasks.add(task);
- }
-
- /**
- * starts all the added tasks in parallel
- *
- * @throws InterruptedException
- */
- public void startParallelTasks() throws InterruptedException
- {
- final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
- .availableProcessors());
- try
- {
- final CountDownLatch latch = new CountDownLatch(tasks.size());
- for (final Runnable task : tasks)
- threads.execute(new Runnable() {
- public void run()
- {
- try
- {
- task.run();
- }
- finally
- {
- latch.countDown();
- }
- }
- });
- latch.await();
- }
- finally
- {
- threads.shutdown();
- }
- }
-} \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
index 8c17dc2..1c38783 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
* @author kmalbari
*
*/
+
+
public class SmooksUtils {
@@ -53,23 +55,19 @@ public class SmooksUtils {
*/
public static VesEvent getTransformedObjectForInput(Smooks smooks, String incomingJsonString) {
- LOGGER.debug("Transforming json " + incomingJsonString);
+ LOGGER.info("Transforming incoming json " );
ExecutionContext executionContext = smooks.createExecutionContext();
- LOGGER.debug("Context created");
+ LOGGER.info("Context created");
Locale defaultLocale = Locale.getDefault();
Locale.setDefault(new Locale("en", "IE"));
StringResult result = new StringResult();
- // Configure the execution context to generate a report...
-// executionContext.setEventListener(new HtmlReportGenerator("target/report/report.html"));
-
- // Filter the input message to the outputWriter, using the execution context...
smooks.filterSource(executionContext, new StreamSource(new ByteArrayInputStream(incomingJsonString.getBytes(StandardCharsets.UTF_8))), result);
- LOGGER.debug("Transformed incoming json now");
+
Locale.setDefault(defaultLocale);
VesEvent vesEvent = (VesEvent) executionContext.getBeanContext().getBean("vesEvent");
- LOGGER.debug("Converted vesEvent from incoming json");
+ LOGGER.debug("consversion successful to VES Event");
return vesEvent;
}