aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java
diff options
context:
space:
mode:
authorPooja03 <pm00501616@techmahindra.com>2019-04-15 20:40:26 +0530
committerPooja03 <pm00501616@techmahindra.com>2019-04-15 20:40:26 +0530
commit8aa9abf52d06288025735d922d6f984c5b30d9e1 (patch)
treebacb7aaa9e725e13c819d6f1eb33cfe3c9ce33e3 /UniversalVesAdapter/src/main/java
parent4cc3807634313eabcb859191108201efc386b7d5 (diff)
Cloudify blueprint for VES mapper
Adding Cloudify blueprint for VES mapper and some updates on code Change-Id: Idcbd9ec080717a80d04263a4baa6e4de9ce143c9 Issue-ID: DCAEGEN2-1176 Signed-off-by: Pooja03 <pm00501616@techmahindra.com>
Diffstat (limited to 'UniversalVesAdapter/src/main/java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java5
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java303
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java505
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java638
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java178
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java552
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java384
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java260
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java559
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java419
10 files changed, 1972 insertions, 1831 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java b/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java
index afc63f8..b4760fd 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java
@@ -58,6 +58,11 @@ public class AlarmAdditionalInformation {
public void setAdditionalProperty(String name, String value) {
this.additionalProperties.put(name, value);
}
+ @JsonAnySetter
+ public void setAdditionalProperties(Map<String, String> h) {
+ this.additionalProperties =h;
+ }
+
@Override
public int hashCode() {
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
index 09e85a0..483b19b 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java
@@ -1,22 +1,23 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018-2019 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2018-2019 TechMahindra
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
package org.onap.universalvesadapter.adapter;
import java.io.ByteArrayInputStream;
@@ -29,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.milyn.Smooks;
-import org.onap.dcaegen2.ves.domain.ves70.VesEvent;
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
import org.onap.universalvesadapter.exception.VesException;
import org.onap.universalvesadapter.service.VESAdapterInitializer;
@@ -58,128 +58,143 @@ import com.google.gson.JsonSyntaxException;
@Component
public class UniversalEventAdapter implements GenericAdapter {
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
-
- @Value("${defaultConfigFilelocation}")
- private String defaultConfigFilelocation;
- private String collectorIdentifierValue;
- private String collectorIdentifierKey;
- private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>();
-
- public UniversalEventAdapter() {
-
- }
-
- /**
- * transforms JSON to VES format and and returns the ves Event
- *
- * @param IncomingJason,eventType
- * @return ves Event
- */
- @Override
- public String transform(String incomingJsonString)
- throws ConfigFileSmooksConversionException, VesException {
- String result = "";
- String configFileData;
-
- String identifier[]= CollectorConfigPropertyRetrival.getProperyArray("identifier",defaultConfigFilelocation );
- String defaultMappingFile="defaultMappingFile-"+Thread.currentThread().getName();
- try {
-
- Gson gson = new Gson();
- JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class);
-
- JsonElement results;
- for(int i=0;i<identifier.length;i++)
- {
- JsonObject obj;
- if((obj=keyObject(body,identifier[i])).has(identifier[i]))
- {
- collectorIdentifierKey=identifier[i];
- results=obj.get(identifier[i]);
- collectorIdentifierValue=results.getAsString();
-
- }
-
- }
- //collectorIdentifierValue = collectorIdentifierValue.substring(0, collectorIdentifierValue.length() - 4);
- if(collectorIdentifierKey.equals("notify OID"))
- {
- collectorIdentifierValue = collectorIdentifierValue.substring(0, collectorIdentifierValue.length() - 4);
- }
-
-
- if (VESAdapterInitializer.getMappingFiles().containsKey(collectorIdentifierValue)) {
- configFileData = VESAdapterInitializer.getMappingFiles().get(collectorIdentifierValue);
- debugLogger.debug("Using Mapping file as Mapping file is available for collector identifier:{}",collectorIdentifierValue);
-
- } else {
-
- configFileData = VESAdapterInitializer.getMappingFiles().get(defaultMappingFile);
-
- debugLogger.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",collectorIdentifierValue);
- }
-
- Smooks smooksTemp = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8)));
- eventToSmooksMapping.put(collectorIdentifierKey, smooksTemp);
-
- Object vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,incomingJsonString);
- debugLogger.info("Incoming json transformed to VES format successfully:"+Thread.currentThread().getName());
- ObjectMapper objectMapper = new ObjectMapper();
- result = objectMapper.writeValueAsString(vesEvent);
- debugLogger.info("Serialized VES json");
- } catch (JsonProcessingException exception) {
- throw new VesException("Unable to convert pojo to VES format, Reason :{}", exception);
- } catch (SAXException | IOException exception) {
- //Invalid Mapping file
- exception.printStackTrace();
- errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception.getMessage());
-
- } catch (JsonSyntaxException exception) {
- // Invalid Trap
- errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}", incomingJsonString, exception);
- }catch (JsonParseException exception) {
- // Invalid Trap
- errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}", incomingJsonString, exception);
- }
- catch (RuntimeException exception) {
-
- exception.printStackTrace();
- errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, exception.getMessage());
-
- }
- return result;
- }
-
- /**
- * Closes all open smooks' instances before bean is destroyed
- */
- @PreDestroy
- public void destroy() {
- for (Smooks smooks : eventToSmooksMapping.values())
- smooks.close();
- debugLogger.warn("All Smooks objects closed");
- }
-
- public JsonObject keyObject(JsonObject object, String searchedKey) {
- boolean exists = object.has(searchedKey);
- JsonObject jsonObject = object;
-
- if(!exists) {
- Iterator<?> keys = object.keySet().iterator();
- while( keys.hasNext() ) {
- String key = (String)keys.next();
- if ( object.get(key) instanceof JsonObject ) {
-
- jsonObject=(JsonObject) object.get(key);
- JsonObject obj = keyObject(jsonObject, searchedKey);
- exists = obj.has(searchedKey);
- }
- }
- }
-
- return jsonObject;
- }
-
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+
+ @Value("${defaultConfigFilelocation}")
+ private String defaultConfigFilelocation;
+ private String collectorIdentifierValue;
+ private String collectorIdentifierKey;
+ private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>();
+
+ public UniversalEventAdapter() {
+
+ }
+
+ /**
+ * transforms JSON to VES format and and returns the ves Event
+ *
+ * @param IncomingJason,eventType
+ * @return ves Event
+ */
+ @Override
+ public String transform(String incomingJsonString)
+ throws ConfigFileSmooksConversionException, VesException {
+ String result = "";
+ String configFileData;
+
+ String identifier[] = CollectorConfigPropertyRetrival.getProperyArray("identifier",
+ defaultConfigFilelocation);
+ String defaultMappingFile =
+ "defaultMappingFile-" + Thread.currentThread().getName();
+ try {
+
+ Gson gson = new Gson();
+ JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class);
+
+ JsonElement results;
+ for (int i = 0; i < identifier.length; i++) {
+ JsonObject obj;
+ if ((obj = keyObject(body, identifier[i])).has(identifier[i])) {
+ collectorIdentifierKey = identifier[i];
+ results = obj.get(identifier[i]);
+ collectorIdentifierValue = results.getAsString();
+
+ }
+
+ }
+ // collectorIdentifierValue = collectorIdentifierValue.substring(0,
+ // collectorIdentifierValue.length() - 4);
+ if (collectorIdentifierKey.equals("notify OID")) {
+ collectorIdentifierValue = collectorIdentifierValue.substring(0,
+ collectorIdentifierValue.length() - 4);
+ }
+
+
+ if (VESAdapterInitializer.getMappingFiles()
+ .containsKey(collectorIdentifierValue)) {
+ configFileData = VESAdapterInitializer.getMappingFiles()
+ .get(collectorIdentifierValue);
+ debugLogger.debug(
+ "Using Mapping file as Mapping file is available for collector identifier:{}",
+ collectorIdentifierValue);
+
+ } else {
+
+ configFileData = VESAdapterInitializer.getMappingFiles()
+ .get(defaultMappingFile);
+
+ debugLogger.debug(
+ "Using Default Mapping file as Mapping file is not available for Enterprise Id / identifer ID:{}",
+ collectorIdentifierValue);
+ }
+
+ Smooks smooksTemp = new Smooks(new ByteArrayInputStream(
+ configFileData.getBytes(StandardCharsets.UTF_8)));
+ eventToSmooksMapping.put(collectorIdentifierKey, smooksTemp);
+
+ Object vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,
+ incomingJsonString);
+ debugLogger.info("Incoming json transformed to VES format successfully:"
+ + Thread.currentThread().getName());
+ ObjectMapper objectMapper = new ObjectMapper();
+ result = objectMapper.writeValueAsString(vesEvent);
+ debugLogger.info("Serialized VES json");
+ } catch (JsonProcessingException exception) {
+ throw new VesException("Unable to convert pojo to VES format, Reason :{}",
+ exception);
+ } catch (SAXException | IOException exception) {
+ // Invalid Mapping file
+ exception.printStackTrace();
+ errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString,
+ exception.getMessage());
+
+ } catch (JsonSyntaxException exception) {
+ // Invalid Trap
+ errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}",
+ incomingJsonString, exception);
+ } catch (JsonParseException exception) {
+ // Invalid Trap
+ errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}",
+ incomingJsonString, exception);
+ } catch (RuntimeException exception) {
+
+ exception.printStackTrace();
+ errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString,
+ exception.getMessage());
+
+ }
+ return result;
+ }
+
+ /**
+ * Closes all open smooks' instances before bean is destroyed
+ */
+ @PreDestroy
+ public void destroy() {
+ for (Smooks smooks : eventToSmooksMapping.values())
+ smooks.close();
+ debugLogger.warn("All Smooks objects closed");
+ }
+
+ public JsonObject keyObject(JsonObject object, String searchedKey) {
+ boolean exists = object.has(searchedKey);
+ JsonObject jsonObject = object;
+
+ if (!exists) {
+ Iterator<?> keys = object.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ if (object.get(key) instanceof JsonObject) {
+
+ jsonObject = (JsonObject) object.get(key);
+ JsonObject obj = keyObject(jsonObject, searchedKey);
+ exists = obj.has(searchedKey);
+ }
+ }
+ }
+
+ return jsonObject;
+ }
+
}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
index 137e3af..4b15f55 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java
@@ -1,252 +1,253 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-
-//TO-DO Lisence Issue ASK to Kedar???????????
-package org.onap.universalvesadapter.configs;
-
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import org.onap.universalvesadapter.utils.DmaapConfig;
-import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
-import com.google.common.base.Objects;
-
-/**
- * <p>
- * Immutable DMaaP MR Configuration for DMaaP MR Publisher.
- * <p>
- * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber
- * Configuration
- * </p>
- * <p>
- *
- * @author Rajiv Singla . Creation Date: 10/12/2016.
- *
- */
-@ComponentScan
-public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig {
-
-
- /**
- * Publisher batching queue size
- */
- private int maxBatchSize;
-
- /**
- * Publisher Recovery Queue Size
- */
- private int maxRecoveryQueueSize;
-
- /**
- * Default uri path prefix
- */
- private String dmaapUriPathPrefix ;
-
- private DMaaPMRPublisherConfig(Builder builder) {
- this.hostName = builder.hostName;
- this.portNumber = builder.portNumber;
- this.topicName = builder.topicName;
- this.protocol = builder.protocol;
- this.userName = builder.userName;
- this.userPassword = builder.userPassword;
- this.contentType = builder.contentType;
- this.maxBatchSize = builder.maxBatchSize;
- this.maxRecoveryQueueSize = builder.maxRecoveryQueueSize;
- this.dmaapUriPathPrefix = builder.dmaapUriPathPrefix;
- }
-
- /**
- * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object
- */
- public static class Builder {
-
- private String hostName;
- private Integer portNumber;
- private String topicName;
- private String userName;
- private String userPassword;
- private String protocol;
- private String contentType;
- private int maxBatchSize;
- private int maxRecoveryQueueSize;
- private String dmaapUriPathPrefix ;
-
-
-
- public Builder(@Nonnull String hostName, @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
- this.hostName = hostName;
- this.topicName = topicName;
- // Default values
- this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
- this.userName = dmaapConfig.getDEFAULT_USER_NAME();
- this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
- this.protocol =dmaapConfig.getDEFAULT_PROTOCOL();
- this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
-
- this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE();
- this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE();
- this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
- }
-
- /**
- * Setup for custom host port number - Defaults to 80.
- *
- * @param portNumber
- * custom port number
- * @return Builder object itself for chaining
- */
- public Builder setPortNumber(@Nonnull Integer portNumber) {
- this.portNumber = portNumber;
- return this;
- }
-
- /**
- * Setup user name for authentication. If no username is provided authentication
- * will be disabled
- *
- * @param userName
- * user name for DMaaP Topic Authentication
- * @return Builder object itself for chaining
- */
- public Builder setUserName(@Nonnull String userName) {
- this.userName = userName;
- return this;
- }
-
- /**
- * Setup user password for authentication. If no password is provided
- * authentication will be disabled
- *
- * @param userPassword
- * user password for DMaaP Topic Authentication
- * @return Builder object itself for chaining
- */
- public Builder setUserPassword(@Nonnull String userPassword) {
- this.userPassword = userPassword;
- return this;
- }
-
- /**
- * Setup custom Publisher protocol - Defaults to https. Note: Only http and
- * https are currently supported.
- *
- * @param protocol
- * protocol e.g. https
- * @return Builder object itself for chaining
- */
- public Builder setProtocol(@Nonnull String protocol) {
- this.protocol = normalizeValidateProtocol(protocol);
- return this;
- }
-
- /**
- * Setup custom Publisher content-type - Defaults to application/json
- *
- * @param contentType
- * content type e.g. application/json
- * @return Builder object itself for chaining
- */
- public Builder setContentType(@Nonnull String contentType) {
- final String normalizedContentType = normalizeValidateContentType(contentType);
- this.contentType = normalizedContentType;
- return this;
- }
-
- /**
- * Setup custom Publisher Max Batch Size - Defaults to 100
- *
- * @param maxBatchSize
- * max Batch Size
- * @return Builder object itself for chaining
- */
- public Builder setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- return this;
- }
-
- /**
- * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold
- * messages temporarily in case DMaaP MR Publisher topic is not responding for
- * any reason. Defaults to 100,000
- *
- * @param maxRecoveryQueueSize
- * max recovery queue size
- * @return Builder object itself for chaining
- */
- public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) {
- this.maxRecoveryQueueSize = maxRecoveryQueueSize;
- return this;
- }
-
-
- /**
- * Creates immutable instance of {@link DMaaPMRPublisherConfig}
- *
- * @return Builds and returns thread safe, immutable
- * {@link DMaaPMRPublisherConfig} object
- */
- public DMaaPMRPublisherConfig build() {
- return new DMaaPMRPublisherConfig(this);
- }
-
- }
- public String getDmaapUriPathPrefix() {
- return dmaapUriPathPrefix;
- }
-
- /**
- * Returns max Publisher Batch Queue Size
- *
- * @return max Publisher Batch Queue size
- */
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- /**
- * Returns max Publisher Recovery Queue Size
- *
- * @return max Recovery Queue size
- */
- public int getMaxRecoveryQueueSize() {
- return maxRecoveryQueueSize;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o;
- return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize);
- }
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+
+//TO-DO Lisence Issue ASK to Kedar???????????
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for DMaaP MR Publisher.
+ * <p>
+ * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber
+ * Configuration
+ * </p>
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ *
+ */
+@ComponentScan
+public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig {
+
+
+ /**
+ * Publisher batching queue size
+ */
+ private int maxBatchSize;
+
+ /**
+ * Publisher Recovery Queue Size
+ */
+ private int maxRecoveryQueueSize;
+
+ /**
+ * Default uri path prefix
+ */
+ private String dmaapUriPathPrefix ;
+
+ private DMaaPMRPublisherConfig(Builder builder) {
+ this.hostName = builder.hostName;
+ this.portNumber = builder.portNumber;
+ this.topicName = builder.topicName;
+ this.protocol = builder.protocol;
+ this.userName = builder.userName;
+ this.userPassword = builder.userPassword;
+ this.contentType = builder.contentType;
+ this.maxBatchSize = builder.maxBatchSize;
+ this.maxRecoveryQueueSize = builder.maxRecoveryQueueSize;
+ this.dmaapUriPathPrefix = builder.dmaapUriPathPrefix;
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private int maxBatchSize;
+ private int maxRecoveryQueueSize;
+ private String dmaapUriPathPrefix ;
+
+
+
+ public Builder(@Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+this.topicName = topicName;
+this.hostName = dmaapConfig.getDmaaphost();
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ // Default values
+
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol =dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+
+ this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE();
+ this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE();
+ this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+ }
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber
+ * custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication
+ * will be disabled
+ *
+ * @param userName
+ * user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ /**
+ * Setup user password for authentication. If no password is provided
+ * authentication will be disabled
+ *
+ * @param userPassword
+ * user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher protocol - Defaults to https. Note: Only http and
+ * https are currently supported.
+ *
+ * @param protocol
+ * protocol e.g. https
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher content-type - Defaults to application/json
+ *
+ * @param contentType
+ * content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+ /**
+ * Setup custom Publisher Max Batch Size - Defaults to 100
+ *
+ * @param maxBatchSize
+ * max Batch Size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ /**
+ * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold
+ * messages temporarily in case DMaaP MR Publisher topic is not responding for
+ * any reason. Defaults to 100,000
+ *
+ * @param maxRecoveryQueueSize
+ * max recovery queue size
+ * @return Builder object itself for chaining
+ */
+ public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) {
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ return this;
+ }
+
+
+ /**
+ * Creates immutable instance of {@link DMaaPMRPublisherConfig}
+ *
+ * @return Builds and returns thread safe, immutable
+ * {@link DMaaPMRPublisherConfig} object
+ */
+ public DMaaPMRPublisherConfig build() {
+ return new DMaaPMRPublisherConfig(this);
+ }
+
+ }
+ public String getDmaapUriPathPrefix() {
+ return dmaapUriPathPrefix;
+ }
+
+ /**
+ * Returns max Publisher Batch Queue Size
+ *
+ * @return max Publisher Batch Queue size
+ */
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * Returns max Publisher Recovery Queue Size
+ *
+ * @return max Recovery Queue size
+ */
+ public int getMaxRecoveryQueueSize() {
+ return maxRecoveryQueueSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o;
+ return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize);
+ }
+
+} \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
index 703684f..a00091a 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java
@@ -1,319 +1,319 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.onap.universalvesadapter.configs;
-
-import java.io.IOException;
-
-import javax.annotation.Nonnull;
-
-import org.onap.universalvesadapter.utils.DmaapConfig;
-
-import com.google.common.base.Objects;
-
-/**
- * <p>
- * Immutable DMaaP MR Configuration for Subscriber.
- * <p>
- * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration
- * <p>
- *
- * @author Rajiv Singla . Creation Date: 10/12/2016.
- */
-public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig {
-
- private final String consumerId;
- private final String consumerGroup;
- private final int timeoutMS;
- private final int messageLimit;
- private String timeoutMSParam;
- private String messageLimitParam;
- private String uriPrefix;
-
- public DMaaPMRSubscriberConfig(Builder builder) {
- this.hostName = builder.hostName;
- this.portNumber = builder.portNumber;
- this.topicName = builder.topicName;
- this.protocol = builder.protocol;
- this.userName = builder.userName;
- this.userPassword = builder.userPassword;
- this.contentType = builder.contentType;
- this.consumerId = builder.consumerId;
- this.consumerGroup = builder.consumerGroup;
- this.timeoutMS = builder.timeoutMS;
- this.messageLimit = builder.messageLimit;
- this.timeoutMSParam = builder.timeoutMSParam;
- this.messageLimitParam = builder.messageLimitParam;
- this.uriPrefix = builder.uriPreifix;
- }
-
- /**
- * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object
- */
- public static class Builder {
-
- private String hostName;
- private Integer portNumber;
- private String topicName;
- private String userName;
- private String userPassword;
- private String protocol;
- private String contentType;
- private String consumerId;
- private String consumerGroup;
- private int timeoutMS;
- private int messageLimit;
- private String timeoutMSParam;
- private String messageLimitParam;
- private String uriPreifix;
-
-
-
- public Builder(@Nonnull String hostName,
- @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
-
- // Required Values
- this.hostName = hostName;
- this.topicName = topicName;
- // Default values
- this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
- this.userName = dmaapConfig.getDEFAULT_USER_NAME();
- this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
- this.protocol = dmaapConfig.getDEFAULT_PROTOCOL();
- this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
- this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID();
- this.consumerGroup = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
- this.timeoutMS =dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
- this.messageLimit = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
- this.timeoutMSParam=dmaapConfig.getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
- this.messageLimitParam=dmaapConfig.getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
- this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
-
- }
-
-
- /**
- * Setup for custom host port number - Defaults to 80.
- *
- * @param portNumber custom port number
- * @return Builder object itself for chaining
- */
- public Builder setPortNumber(@Nonnull Integer portNumber) {
- this.portNumber = portNumber;
- return this;
- }
-
-
- /**
- * Setup user name for authentication. If no username is provided authentication will be disabled
- *
- * @param userName user name for DMaaP Topic Authentication
- * @return Builder object itself for chaining
- */
- public Builder setUserName(@Nonnull String userName) {
- this.userName = userName;
- return this;
- }
-
-
- /**
- * Setup user password for authentication. If no password is provided authentication will be disabled
- *
- * @param userPassword user password for DMaaP Topic Authentication
- * @return Builder object itself for chaining
- */
- public Builder setUserPassword(@Nonnull String userPassword) {
- this.userPassword = userPassword;
- return this;
- }
-
-
- /**
- * Setup custom Subscriber protocol - Defaults to https.
- * Note: Only http and https are currently supported.
- *
- * @param protocol protocol e.g. https or http
- * @return Builder object itself for chaining
- */
- public Builder setProtocol(@Nonnull String protocol) {
-
- this.protocol = normalizeValidateProtocol(protocol);
- return this;
- }
-
- /**
- * Setup custom Subscriber content-type - Defaults to application/json
- *
- * @param contentType content type e.g. application/json
- * @return Builder object itself for chaining
- */
- public Builder setContentType(@Nonnull String contentType) {
- final String normalizedContentType = normalizeValidateContentType(contentType);
- this.contentType = normalizedContentType;
- return this;
- }
-
-
- /**
- * Setup custom Consumer Id - Defaults to random Id
- *
- * @param consumerId - custom consumer ID
- * @return Builder object itself for chaining
- */
- public Builder setConsumerId(@Nonnull String consumerId) {
- this.consumerId = consumerId;
- return this;
- }
-
- /**
- * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID
- *
- * @param consumerGroup - custom Consumer Group
- * @return Builder object itself for chaining
- */
- public Builder setConsumerGroup(@Nonnull String consumerGroup) {
- this.consumerGroup = consumerGroup;
- return this;
- }
-
- /**
- * Setup Custom Subscriber timeout in ms - Default to no timeout limit
- *
- * @param timeoutMS timeout in milliseconds
- * @return Builder object itself for chaining
- */
- public Builder setTimeoutMS(@Nonnull int timeoutMS) {
- this.timeoutMS = timeoutMS;
- return this;
- }
-
- /**
- * Setup custom Subscriber Message Limit - Default to no limit
- *
- * @param messageLimit message Limit
- * @return Builder object itself for chaining
- */
- public Builder setMessageLimit(@Nonnull int messageLimit) {
- this.messageLimit = messageLimit;
- return this;
- }
-
- /**
- * Builds Immutable instance of {@link DMaaPMRSubscriberConfig}
- *
- * @return immutable DMaaP Subscriber Config Object
- */
- public DMaaPMRSubscriberConfig build() {
- return new DMaaPMRSubscriberConfig(this);
- }
-
- }
-
-
- public String getTimeoutMSParam() {
- return timeoutMSParam;
- }
-
- public void setTimeoutMSParam(String timeoutMSParam) {
- this.timeoutMSParam = timeoutMSParam;
- }
-
- public String getMessageLimitParam() {
- return messageLimitParam;
- }
-
- public void setMessageLimitParam(String messageLimitParam) {
- this.messageLimitParam = messageLimitParam;
- }
-
- public String getUriPrefix() {
- return uriPrefix;
- }
-
- public void setUriPrefix(String uriPreifix) {
- this.uriPrefix = uriPreifix;
- }
-
- /**
- * DMaaP MR Subscriber Consumer Id
- *
- * @return consumer Id
- */
- public String getConsumerId() {
- return consumerId;
- }
-
- /**
- * DMaaP MR Subscriber Consumer Group
- *
- * @return consumer group
- */
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- /**
- * DMaaP MR Subscriber Timeout in ms
- *
- * @return subscriber timeout ms
- */
- public int getTimeoutMS() {
- return timeoutMS;
- }
-
- /**
- * DMaaP MR Subscriber message limit
- *
- * @return subscriber message limit
- */
- public int getMessageLimit() {
- return messageLimit;
- }
-
- /**
- * DMaaP property file
- *
- * @return Properties
- */
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o;
- return Objects.equal(consumerId, that.consumerId) &&
- Objects.equal(consumerGroup, that.consumerGroup) &&
- Objects.equal(timeoutMS, that.timeoutMS) &&
- Objects.equal(messageLimit, that.messageLimit);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit);
- }
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.universalvesadapter.configs;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.onap.universalvesadapter.utils.DmaapConfig;
+
+import com.google.common.base.Objects;
+
+/**
+ * <p>
+ * Immutable DMaaP MR Configuration for Subscriber.
+ * <p>
+ * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/12/2016.
+ */
+public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig {
+
+ private final String consumerId;
+ private final String consumerGroup;
+ private final int timeoutMS;
+ private final int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPrefix;
+
+ public DMaaPMRSubscriberConfig(Builder builder) {
+ this.hostName = builder.hostName;
+ this.portNumber = builder.portNumber;
+ this.topicName = builder.topicName;
+ this.protocol = builder.protocol;
+ this.userName = builder.userName;
+ this.userPassword = builder.userPassword;
+ this.contentType = builder.contentType;
+ this.consumerId = builder.consumerId;
+ this.consumerGroup = builder.consumerGroup;
+ this.timeoutMS = builder.timeoutMS;
+ this.messageLimit = builder.messageLimit;
+ this.timeoutMSParam = builder.timeoutMSParam;
+ this.messageLimitParam = builder.messageLimitParam;
+ this.uriPrefix = builder.uriPreifix;
+ }
+
+ /**
+ * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object
+ */
+ public static class Builder {
+
+ private String hostName;
+ private Integer portNumber;
+ private String topicName;
+ private String userName;
+ private String userPassword;
+ private String protocol;
+ private String contentType;
+ private String consumerId;
+ private String consumerGroup;
+ private int timeoutMS;
+ private int messageLimit;
+ private String timeoutMSParam;
+ private String messageLimitParam;
+ private String uriPreifix;
+
+
+
+ public Builder(@Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException {
+
+ // Required Values
+ this.hostName = dmaapConfig.getDmaaphost();
+ this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER();
+ this.topicName = topicName;
+ // Default values
+
+ this.userName = dmaapConfig.getDEFAULT_USER_NAME();
+ this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD();
+ this.protocol = dmaapConfig.getDEFAULT_PROTOCOL();
+ this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE();
+ this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID();
+ this.consumerGroup = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX();
+ this.timeoutMS =dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS();
+ this.messageLimit = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT();
+ this.timeoutMSParam=dmaapConfig.getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME();
+ this.messageLimitParam=dmaapConfig.getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME();
+ this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX();
+
+ }
+
+
+ /**
+ * Setup for custom host port number - Defaults to 80.
+ *
+ * @param portNumber custom port number
+ * @return Builder object itself for chaining
+ */
+ public Builder setPortNumber(@Nonnull Integer portNumber) {
+ this.portNumber = portNumber;
+ return this;
+ }
+
+
+ /**
+ * Setup user name for authentication. If no username is provided authentication will be disabled
+ *
+ * @param userName user name for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserName(@Nonnull String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+
+ /**
+ * Setup user password for authentication. If no password is provided authentication will be disabled
+ *
+ * @param userPassword user password for DMaaP Topic Authentication
+ * @return Builder object itself for chaining
+ */
+ public Builder setUserPassword(@Nonnull String userPassword) {
+ this.userPassword = userPassword;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Subscriber protocol - Defaults to https.
+ * Note: Only http and https are currently supported.
+ *
+ * @param protocol protocol e.g. https or http
+ * @return Builder object itself for chaining
+ */
+ public Builder setProtocol(@Nonnull String protocol) {
+
+ this.protocol = normalizeValidateProtocol(protocol);
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber content-type - Defaults to application/json
+ *
+ * @param contentType content type e.g. application/json
+ * @return Builder object itself for chaining
+ */
+ public Builder setContentType(@Nonnull String contentType) {
+ final String normalizedContentType = normalizeValidateContentType(contentType);
+ this.contentType = normalizedContentType;
+ return this;
+ }
+
+
+ /**
+ * Setup custom Consumer Id - Defaults to random Id
+ *
+ * @param consumerId - custom consumer ID
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerId(@Nonnull String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ /**
+ * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID
+ *
+ * @param consumerGroup - custom Consumer Group
+ * @return Builder object itself for chaining
+ */
+ public Builder setConsumerGroup(@Nonnull String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ /**
+ * Setup Custom Subscriber timeout in ms - Default to no timeout limit
+ *
+ * @param timeoutMS timeout in milliseconds
+ * @return Builder object itself for chaining
+ */
+ public Builder setTimeoutMS(@Nonnull int timeoutMS) {
+ this.timeoutMS = timeoutMS;
+ return this;
+ }
+
+ /**
+ * Setup custom Subscriber Message Limit - Default to no limit
+ *
+ * @param messageLimit message Limit
+ * @return Builder object itself for chaining
+ */
+ public Builder setMessageLimit(@Nonnull int messageLimit) {
+ this.messageLimit = messageLimit;
+ return this;
+ }
+
+ /**
+ * Builds Immutable instance of {@link DMaaPMRSubscriberConfig}
+ *
+ * @return immutable DMaaP Subscriber Config Object
+ */
+ public DMaaPMRSubscriberConfig build() {
+ return new DMaaPMRSubscriberConfig(this);
+ }
+
+ }
+
+
+ public String getTimeoutMSParam() {
+ return timeoutMSParam;
+ }
+
+ public void setTimeoutMSParam(String timeoutMSParam) {
+ this.timeoutMSParam = timeoutMSParam;
+ }
+
+ public String getMessageLimitParam() {
+ return messageLimitParam;
+ }
+
+ public void setMessageLimitParam(String messageLimitParam) {
+ this.messageLimitParam = messageLimitParam;
+ }
+
+ public String getUriPrefix() {
+ return uriPrefix;
+ }
+
+ public void setUriPrefix(String uriPreifix) {
+ this.uriPrefix = uriPreifix;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Id
+ *
+ * @return consumer Id
+ */
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Group
+ *
+ * @return consumer group
+ */
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * DMaaP MR Subscriber Timeout in ms
+ *
+ * @return subscriber timeout ms
+ */
+ public int getTimeoutMS() {
+ return timeoutMS;
+ }
+
+ /**
+ * DMaaP MR Subscriber message limit
+ *
+ * @return subscriber message limit
+ */
+ public int getMessageLimit() {
+ return messageLimit;
+ }
+
+ /**
+ * DMaaP property file
+ *
+ * @return Properties
+ */
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o;
+ return Objects.equal(consumerId, that.consumerId) &&
+ Objects.equal(consumerGroup, that.consumerGroup) &&
+ Objects.equal(timeoutMS, that.timeoutMS) &&
+ Objects.equal(messageLimit, that.messageLimit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit);
+ }
+
+} \ No newline at end of file
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
index ce75a6e..1393e1b 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java
@@ -1,91 +1,87 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.dmaap;
-
-import java.io.IOException;
-
-import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
-import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
-import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
-import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
-import org.onap.universalvesadapter.utils.DmaapConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class Creator {
-
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
- private DMaaPMRFactory dMaaPMRFactoryInstance;
- private String dmaaphost;
- private String publisherTopic;
- @Autowired
- private DmaapConfig dmaapConfig;
-
- // prop initializer
-
- public void propertyFileInitializer() {
-
- this.dmaaphost = dmaapConfig.getDmaaphost();
- this.publisherTopic = dmaapConfig.getPublisherTopic();
- this.dMaaPMRFactoryInstance = DMaaPMRFactory.create();
- debugLogger.info("The Hostname of DMaap is :" + dmaaphost);
-
-
- }
-
- // publisher
- public DMaaPMRPublisher getDMaaPMRPublisher(){
- propertyFileInitializer();
- DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null;
- try {
- dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(dmaaphost, publisherTopic,dmaapConfig).build();
- } catch (IOException e) {
- errorLogger.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause());
- }
- return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig);
- }
-
- // subscriber
- public DMaaPMRSubscriber getDMaaPMRSubscriber(String subcriberTopic){
- propertyFileInitializer();
- DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null;
- try {
- dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(dmaaphost, subcriberTopic, dmaapConfig).build();
- } catch (IOException e) {
-
- errorLogger.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause());
- }
-
- return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig);
-
- }
- public void setDmaapConfig(DmaapConfig dmaapConfig) {
- this.dmaapConfig = dmaapConfig;
- }
-
- public DmaapConfig getDmaapConfig() {
- return dmaapConfig;
- }
-
-}
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.universalvesadapter.dmaap;
+
+import java.io.IOException;
+
+import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
+import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class Creator {
+
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+ private DMaaPMRFactory dMaaPMRFactoryInstance;
+
+
+ @Autowired
+ private DmaapConfig dmaapConfig;
+
+ // prop initializer
+
+ public void propertyFileInitializer() {
+
+ this.dMaaPMRFactoryInstance = DMaaPMRFactory.create();
+
+ }
+
+ // publisher
+ public DMaaPMRPublisher getDMaaPMRPublisher(String publisherTopic){
+ propertyFileInitializer();
+ DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null;
+ try {
+ dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(publisherTopic,dmaapConfig).build();
+ } catch (IOException e) {
+ errorLogger.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause());
+ }
+ return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig);
+ }
+
+ // subscriber
+ public DMaaPMRSubscriber getDMaaPMRSubscriber(String subcriberTopic){
+ propertyFileInitializer();
+ DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null;
+ try {
+ dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(subcriberTopic, dmaapConfig).build();
+ } catch (IOException e) {
+
+ errorLogger.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause());
+ }
+
+ return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig);
+
+ }
+ public void setDmaapConfig(DmaapConfig dmaapConfig) {
+ this.dmaapConfig = dmaapConfig;
+ }
+
+ public DmaapConfig getDmaapConfig() {
+ return dmaapConfig;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
index 671fcc3..f2adc9b 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
@@ -1,290 +1,262 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018-2019 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.onap.universalvesadapter.dmaap.Creator;
-import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
-import org.onap.universalvesadapter.utils.DmaapConfig;
-import org.onap.universalvesadapter.utils.FetchDynamicConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.context.ApplicationContext;
-import org.springframework.core.Ordered;
-import org.springframework.stereotype.Component;
-
-//AdapterInitializer
-@Component
-public class VESAdapterInitializer implements CommandLineRunner, Ordered {
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
-
- @Autowired
- private Creator creator;
- @Autowired
- private DmaapConfig dmaapConfig;
- @Value("${defaultConfigFilelocation}")
- String defaultConfigFilelocation;
- @Value("${server.port}")
- String serverPort;
-
- private static Map<String, String> mappingFiles = new HashMap<String, String>();
- private static Map<String, String> env;
-
- @Autowired
- private ApplicationContext applicationContext;
-
- @Override
- public void run(String... args) throws Exception {
- debugLogger.info("The Default Config file Location:" + defaultConfigFilelocation.trim());
-
- if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) {
- errorLogger.error("Default Config file " + defaultConfigFilelocation.trim() + " is missing");
- System.exit(SpringApplication.exit(applicationContext, () -> {
- errorLogger.error("Application stoped due to missing default Config file");
- return -1;
- }));
- }
- env = System.getenv();
- for (Map.Entry<String, String> entry : env.entrySet()) {
- debugLogger.debug(entry.getKey() + ":" + entry.getValue());
- }
-
- //checks for DMaaP Host and Port No
- if( (env.get("DMAAPHOST")==null ||(env.get("MR_DEFAULT_PORT_NUMBER")==null))) {
-
- errorLogger.error("DMAAPHOST,MR_DEFAULT_PORT_NUMBER environment parameter is missing. Sample Usage is -\n sudo docker run -d -p 8085:8085/tcp --env MR_DEFAULT_PORT_NUMBER=3904 --env CONSUL_HOST=10.53.172.109 --env HOSTNAME=mvp-dcaegen2-service-mua --env CONFIG_BINDING_SERVICE=config_binding_service --env DMAAPHOST='10.53.172.156' nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest");
-
- System.exit(SpringApplication.exit(applicationContext, () -> {errorLogger.error("Application stoped due missing DMAAPHOST or MR_DEFAULT_PORT_NUMBER environment varibales.Please refer above example for environment varibales to pass ");return-1;}));
- }
-
- dmaapConfig.setDmaaphost(env.get("DMAAPHOST"));
- dmaapConfig.setDEFAULT_PORT_NUMBER(Integer.parseInt(env.get("MR_DEFAULT_PORT_NUMBER")));
- creator.setDmaapConfig(dmaapConfig);
- //check for consul details
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) {
- debugLogger.info(">>>Dynamic configuration to be used");
- FetchDynamicConfig.cbsCall(defaultConfigFilelocation);
-
- } else {
- debugLogger.info(">>>Static configuration to be used");
-
- }
- readJsonToMap(defaultConfigFilelocation);
-
- //prepareDatabase();
- //fetchMappingFile();
-
- debugLogger.info("Triggering controller's start url ");
- executecurl("http://localhost:"+serverPort+"/start");
- }
-
-
- private static String executecurl(String url) {
-
- debugLogger.info("Running curl command for url:{}",url);
- String[] command = { "curl", "-v", url };
- ProcessBuilder process = new ProcessBuilder(command);
- Process p;
- String result = null;
- try {
- p = process.start();
- try(InputStreamReader ipr = new InputStreamReader(p.getInputStream());
- BufferedReader reader = new BufferedReader(ipr)){
- StringBuilder builder = new StringBuilder();
- String line;
-
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- }
- result = builder.toString();
- }
- } catch (IOException e) {
- errorLogger.error("error", e);
- }
- return result;
-
- }
-
- private void readJsonToMap(String configFile) {
- try {
- JSONArray collectorArray=CollectorConfigPropertyRetrival.collectorConfigArray(configFile);
-
- for (int i = 0; i < collectorArray.size(); i++) {
- JSONObject obj2 = (JSONObject) collectorArray.get(i);
-
- if (obj2.containsKey("mapping-files")) {
-
- JSONArray a1 = (JSONArray) obj2.get("mapping-files");
-
- for (int j = 0; j < a1.size(); j++) {
- JSONObject obj3 = (JSONObject) a1.get(j);
- Set<Entry<String, String>> set = obj3.entrySet();
-
- for (Entry<String, String> entry : set) {
-
- mappingFiles.put(entry.getKey(), entry.getValue());
- }
- }
-
- }
- }
-
- } catch (Exception e) {
- errorLogger.error("Exception occured while reading Collector config file cause: ",e.getCause());
- }
-
- }
-
-
- /* private void prepareDatabase() throws IOException {
-
-
- debugLogger.info("The Default Mapping file Location:" + defaultMappingFileLocation.trim());
-
- if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) {
- errorLogger.error(
- "Default mapping file " + defaultMappingFileLocation.trim() + " is missing");
- System.exit(SpringApplication.exit(applicationContext, () -> {
- errorLogger.error("Application stoped due to missing default mapping file");
- return -1;
- }));
- }
-
- File file = new File(
- ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile());
-
- try (FileInputStream fileInputStream = new FileInputStream(file)) {
- bytesArray = new byte[(int) file.length()];
- fileInputStream.read(bytesArray);
-
- } catch (IOException e1) {
- errorLogger.error("Exception Occured while reading the default mapping file ,Cause: "
- + e1.getMessage(), e1);
- // exit on missing default mapping file
- System.exit(SpringApplication.exit(applicationContext, () -> {
- errorLogger.error("Application stoped due to missing default mapping file");
- return -1;
- }));
- }
-
- try (Connection con = DriverManager.getConnection(dBurl, user, pwd);
- // creating table if not exist
- PreparedStatement pstmt11 =
- con.prepareStatement("CREATE TABLE IF NOT EXISTS public."
- + MappingFileTableName + "\r\n" + "(\r\n"
- + " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n"
- + " mappingfilecontents bytea,\r\n"
- + " mimetype character varying COLLATE pg_catalog.\"default\",\r\n"
- + " file_name character varying COLLATE pg_catalog.\"default\",\r\n"
- + " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n"
- + ")\r\n" + "WITH (\r\n" + " OIDS = FALSE\r\n" + ")\r\n"
- + "TABLESPACE pg_default;")) {
-
- metricsLogger.info("Postgresql Connection successful...");
- debugLogger.debug("Connection object:{}" , con.toString());
-
- pstmt11.executeUpdate();
- debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully....");
-
- if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) {
-
- try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO "
- + MappingFileTableName
- + "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;")) {
- pstmt.setString(1, defaultEnterpriseId);
- pstmt.setBytes(2, bytesArray);
- pstmt.setString(3, "text/xml");
- pstmt.setString(4, file.getName());
-
- pstmt.executeUpdate();
- debugLogger.info("Made sure that default mapping file is present in table");
- }
- } else {
- errorLogger.error(file.getName() + " is empty");
- // exit on empty mapping file
- System.exit(SpringApplication.exit(applicationContext, () -> {
- errorLogger.error("Application stoped beacuase default mapping file is empty..");
- return -1;
- }));
- }
-
- } catch (SQLException e) {
- errorLogger.error("Received exception : " + e.getMessage(), e);
- // exit on SqlException
- System.exit(SpringApplication.exit(applicationContext, () -> {
- errorLogger.error("Application Stoped due to ", e.getCause());
- return -1;
- }));
- }
-
- }*/
- /*public void fetchMappingFile() {
-
- try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {
- debugLogger.info("Retrieving data from DB");
- PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file");
- ResultSet rs = pstmt.executeQuery();
- // parsing the column each time is a linear search
- int column1Pos = rs.findColumn("enterpriseid");
- int column2Pos = rs.findColumn("mappingfilecontents");
- String hexString;
- while (rs.next()) {
- String column1 = rs.getString(column1Pos);
- String column2 = rs.getString(column2Pos);
- hexString = column2.substring(2);
- byte[] bytes = Hex.decodeHex(hexString.toCharArray());
- String data = new String(bytes, "UTF-8");
- mappingFiles.put(column1, data);
- }
- debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + mappingFiles.size());
- } catch (Exception e) {
- errorLogger.error("Error occured due to :" + e.getMessage());
- e.printStackTrace();
- }
-
- }*/
-
- public static Map<String, String> getMappingFiles() {
- return mappingFiles;
- }
-
- public static void setMappingFiles(Map<String, String> mappingFiles) {
- VESAdapterInitializer.mappingFiles = mappingFiles;
- }
-
- @Override
- public int getOrder() {
- return 0;
- }
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2018-2019 TechMahindra
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.universalvesadapter.service;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
+import org.onap.universalvesadapter.utils.FetchDynamicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Component;
+
+// AdapterInitializer
+@Component
+public class VESAdapterInitializer implements CommandLineRunner, Ordered {
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+
+ @Value("${defaultConfigFilelocation}")
+ String defaultConfigFilelocation;
+ @Value("${server.port}")
+ String serverPort;
+
+ private static Map<String, String> mappingFiles = new HashMap<String, String>();
+ private static Map<String, String> env;
+
+ @Autowired
+ private ApplicationContext applicationContext;
+
+ @Override
+ public void run(String... args) throws Exception {
+ debugLogger.info("The Default Config file Location:"
+ + defaultConfigFilelocation.trim());
+ // final Path configFilePath =
+ // Paths.get(defaultConfigFilelocation.trim()).toAbsolutePath();
+ // File f = new File(configFilePath.toString());
+
+ if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) {
+ errorLogger.error("Default Config file " + defaultConfigFilelocation.trim()
+ + " is missing");
+ System.exit(SpringApplication.exit(applicationContext, () -> {
+ errorLogger.error(
+ "Application stoped due to missing default Config file");
+ return -1;
+ }));
+ }
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ debugLogger.debug(entry.getKey() + ":" + entry.getValue());
+ }
+
+ // check for consul details
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
+ && env.containsKey("HOSTNAME")) {
+ debugLogger.info(">>>Dynamic configuration to be used");
+ FetchDynamicConfig.cbsCall(defaultConfigFilelocation);
+
+ } else {
+ debugLogger.info(">>>Static configuration to be used");
+
+ }
+ readJsonToMap(defaultConfigFilelocation);
+
+ // prepareDatabase();
+ // fetchMappingFile();
+
+ debugLogger.info("Triggering controller's start url ");
+ executecurl("http://localhost:" + serverPort + "/start");
+ }
+
+
+ private static String executecurl(String url) {
+
+ debugLogger.info("Running curl command for url:{}", url);
+ String[] command = {"curl", "-v", url};
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ try (InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr)) {
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ }
+ } catch (IOException e) {
+ errorLogger.error("error", e);
+ }
+ return result;
+
+ }
+
+ private void readJsonToMap(String configFile) {
+ try {
+ JSONArray collectorArray = CollectorConfigPropertyRetrival
+ .collectorConfigArray(configFile);
+
+ for (int i = 0; i < collectorArray.size(); i++) {
+ JSONObject obj2 = (JSONObject) collectorArray.get(i);
+
+ if (obj2.containsKey("mapping-files")) {
+
+ JSONArray a1 = (JSONArray) obj2.get("mapping-files");
+
+ for (int j = 0; j < a1.size(); j++) {
+ JSONObject obj3 = (JSONObject) a1.get(j);
+ Set<Entry<String, String>> set = obj3.entrySet();
+
+ for (Entry<String, String> entry : set) {
+
+ mappingFiles.put(entry.getKey(),
+ entry.getValue());
+ }
+ }
+
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ errorLogger.error(
+ "Exception occured while reading Collector config file cause: ",
+ e.getCause());
+ }
+
+ }
+
+
+ /*
+ * private void prepareDatabase() throws IOException {
+ *
+ *
+ * debugLogger.info("The Default Mapping file Location:" +
+ * defaultMappingFileLocation.trim());
+ *
+ * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) {
+ * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() +
+ * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> {
+ * errorLogger.error("Application stoped due to missing default mapping file"); return -1;
+ * })); }
+ *
+ * File file = new File(
+ * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile());
+ *
+ * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new
+ * byte[(int) file.length()]; fileInputStream.read(bytesArray);
+ *
+ * } catch (IOException e1) {
+ * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " +
+ * e1.getMessage(), e1); // exit on missing default mapping file
+ * System.exit(SpringApplication.exit(applicationContext, () -> {
+ * errorLogger.error("Application stoped due to missing default mapping file"); return -1;
+ * })); }
+ *
+ * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if
+ * not exist PreparedStatement pstmt11 =
+ * con.prepareStatement("CREATE TABLE IF NOT EXISTS public." + MappingFileTableName + "\r\n"
+ * + "(\r\n" +
+ * " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" +
+ * " mappingfilecontents bytea,\r\n" +
+ * " mimetype character varying COLLATE pg_catalog.\"default\",\r\n" +
+ * " file_name character varying COLLATE pg_catalog.\"default\",\r\n" +
+ * " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" +
+ * "WITH (\r\n" + " OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) {
+ *
+ * metricsLogger.info("Postgresql Connection successful...");
+ * debugLogger.debug("Connection object:{}" , con.toString());
+ *
+ * pstmt11.executeUpdate();
+ * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully....");
+ *
+ * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) {
+ *
+ * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName
+ * +
+ * "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;"
+ * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray);
+ * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName());
+ *
+ * pstmt.executeUpdate();
+ * debugLogger.info("Made sure that default mapping file is present in table"); } } else {
+ * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file
+ * System.exit(SpringApplication.exit(applicationContext, () -> {
+ * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return
+ * -1; })); }
+ *
+ * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(),
+ * e); // exit on SqlException System.exit(SpringApplication.exit(applicationContext, () ->
+ * { errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); }
+ *
+ * }
+ */
+ /*
+ * public void fetchMappingFile() {
+ *
+ * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {
+ * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt =
+ * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery();
+ * // parsing the column each time is a linear search int column1Pos =
+ * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents");
+ * String hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String
+ * column2 = rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes =
+ * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8");
+ * mappingFiles.put(column1, data); }
+ * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" +
+ * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :"
+ * + e.getMessage()); e.printStackTrace(); }
+ *
+ * }
+ */
+
+ public static Map<String, String> getMappingFiles() {
+ return mappingFiles;
+ }
+
+ public static void setMappingFiles(Map<String, String> mappingFiles) {
+ VESAdapterInitializer.mappingFiles = mappingFiles;
+ }
+
+ @Override
+ public int getOrder() {
+ return 0;
+ }
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index a5b88ec..cbfeead 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -1,164 +1,220 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2018 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.service;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
-import org.onap.universalvesadapter.dmaap.Creator;
-import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
-import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
-import org.onap.universalvesadapter.exception.DMaapException;
-import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
-import org.onap.universalvesadapter.utils.DmaapConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * Service that starts the universal ves adapter module to listen for events
- *
- * @author kmalbari
- *
- */
-@Component
-public class VesService {
-
- private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
-
- private boolean isRunning = true;
- @Value("${defaultConfigFilelocation}")
- private String defaultConfigFilelocation;
- @Autowired
- private Creator creator;
- @Autowired
- private UniversalEventAdapter eventAdapter;
- @Autowired
- private DmaapConfig dmaapConfig;
- private static List<String> list = new LinkedList<String>();
-
-
- /**
- * method triggers universal VES adapter module.
- */
- public void start() throws MapperConfigException {
- debugLogger.info("Creating Subcriber and Publisher with creator.............");
-
-
- DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
-
- String topicArray[]= CollectorConfigPropertyRetrival.getProperyArray("subscriberTopic",defaultConfigFilelocation);
-
-
- ExecutorService executorService=Executors.newFixedThreadPool(topicArray.length);
- for(int i=0;i<topicArray.length;i++) {
- String topicName =topicArray[i];
- DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicArray[i]);
-
- executorService.submit(new Runnable() {
-
- @Override
- public void run(){
-
- Thread.currentThread().setName(topicName);
- metricsLogger.info("fetch and publish from and to Dmaap started:"+Thread.currentThread().getName());
- int pollingInternalInt=dmaapConfig.getPollingInterval();
- debugLogger.info("The Polling Interval in Milli Second is :{}" +pollingInternalInt);
- debugLogger.info("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
- while (true) {
- synchronized (this) {
- for (String incomingJsonString : subcriber.fetchMessages().getFetchedMessages()) {
- list.add(incomingJsonString);
-
- }
-
- if (list.isEmpty()) {
- try {
- Thread.sleep(pollingInternalInt);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- debugLogger.debug("number of messages to be converted :{}", list.size());
-
- if (!list.isEmpty()) {
- String val = ((LinkedList<String>) list).removeFirst();
- List<String> messages = new ArrayList<>();
- String vesEvent = processReceivedJson(val);
- if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
- messages.add(vesEvent);
- publisher.publish(messages);
- metricsLogger.info("Message successfully published to DMaaP Topic");
- }
-
- }
-
- }
- }
-
-
-
- }
- });
- }
-
-
-
- }
-
- /**
- * method stops universal ves adapter module
- */
- public void stop() {
- isRunning = false;
- }
-
- private String processReceivedJson(String incomingJsonString) {
- String outgoingJsonString = null;
- if (!"".equals(incomingJsonString)) {
-
- try {
-
- outgoingJsonString = eventAdapter.transform(incomingJsonString);
-
- } catch (VesException exception) {
- errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
- debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
- } catch (DMaapException e) {
- errorLogger.error("Received exception : {}", e.getMessage());
- }
- }
- return outgoingJsonString;
- }
-}
-
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2018-2019 TechMahindra
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.universalvesadapter.service;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.exception.MapperConfigException;
+import org.onap.universalvesadapter.exception.VesException;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
+import org.onap.universalvesadapter.utils.DmaapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Service that starts the universal ves adapter module to listen for events
+ *
+ * @author kmalbari
+ *
+ */
+/**
+ * @author PM00501616
+ *
+ */
+/**
+ * @author PM00501616
+ *
+ */
+@Component
+public class VesService {
+
+ private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+
+ private boolean isRunning = true;
+ @Value("${defaultConfigFilelocation}")
+ private String defaultConfigFilelocation;
+ @Autowired
+ private Creator creator;
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+ @Autowired
+ private DmaapConfig dmaapConfig;
+ @Autowired
+ private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival;
+ private static List<String> list = new LinkedList<String>();
+
+
+ /**
+ * method triggers universal VES adapter module.
+ */
+ public void start() throws MapperConfigException {
+ debugLogger.info("Creating Subcriber and Publisher with creator.............");
+ String topicName = null;
+ String publisherTopic = null;
+ // Hashmap of subscriber and publisher details in correspondence to the respective
+ // collectors in kv file
+ Map<String, String> dmaapTopics = collectorConfigPropertyRetrival.getDmaapTopics(
+ "stream_subscriber", "stream_publisher", defaultConfigFilelocation);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size());
+ for (Map.Entry<String, String> entry : dmaapTopics.entrySet()) {
+ String threadName = entry.getKey();
+ // subcriber and corresponding publisher topics in a Map
+ Map<String, String> subpubTopics = collectorConfigPropertyRetrival
+ .getTopics(entry.getKey(), entry.getValue(),
+ defaultConfigFilelocation);
+ for (Map.Entry<String, String> entry2 : subpubTopics.entrySet()) {
+ topicName = entry2.getKey();
+ publisherTopic = entry2.getValue();
+ }
+
+
+ // Publisher and subcriber as per each collector
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName);
+
+ DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic);
+ debugLogger.info("Created scriber topic:" + topicName + "publisher topic:"
+ + publisherTopic);
+
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run() {
+
+ Thread.currentThread().setName(threadName);
+ metricsLogger.info(
+ "fetch and publish from and to Dmaap started:"
+ + Thread.currentThread()
+ .getName());
+ int pollingInternalInt = dmaapConfig.getPollingInterval();
+ debugLogger.info(
+ "The Polling Interval in Milli Second is :{}"
+ + pollingInternalInt);
+ debugLogger.info(
+ "starting subscriber & publisher thread:{}",
+ Thread.currentThread().getName());
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : subcriber
+ .fetchMessages()
+ .getFetchedMessages()) {
+ list.add(incomingJsonString);
+
+ }
+
+ if (list.isEmpty()) {
+ try {
+ Thread.sleep(pollingInternalInt);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ debugLogger.debug(
+ "number of messages to be converted :{}",
+ list.size());
+
+ if (!list.isEmpty()) {
+ String val = ((LinkedList<String>) list)
+ .removeFirst();
+ List<String> messages =
+ new ArrayList<>();
+ String vesEvent =
+ processReceivedJson(
+ val);
+ if (vesEvent != null && (!(vesEvent
+ .isEmpty()
+ || vesEvent.equals(
+ "")))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+
+ metricsLogger.info(
+ "Message successfully published to DMaaP Topic-\n"
+ + vesEvent);
+ }
+
+ }
+
+ }
+ }
+
+
+
+ }
+ });
+ }
+
+
+
+ }
+
+ /**
+ * method stops universal ves adapter module
+ */
+ public void stop() {
+ isRunning = false;
+ }
+
+
+ /**
+ * method for processing the incoming json to ves
+ *
+ * @param incomingJsonString
+ * @return ves
+ */
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString);
+
+ } catch (VesException exception) {
+ errorLogger.error(
+ "Received exception : {},{}"
+ + exception.getMessage(),
+ exception);
+ debugLogger.warn(
+ "APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ errorLogger.error("Received exception : {}", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
+}
+
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
index 2262b9b..afa5c7c 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
@@ -1,88 +1,172 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2019 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CollectorConfigPropertyRetrival {
-
- @Value("${defaultConfigFilelocation}")
- public String defaultConfigFilelocation;
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
- private static JSONArray array;
-
- public static JSONArray collectorConfigArray(String configFile) {
- try {
- JSONParser parser = new JSONParser();
-
- File file = new File(ClassLoader.getSystemResource(configFile.trim()).getFile());
-
- FileReader fileReader = new FileReader(file);
- JSONObject obj = (JSONObject) parser.parse(fileReader);
- JSONObject appobj = (JSONObject) obj.get("app_preferences");
- array = (JSONArray) appobj.get("collectors");
-
- debugLogger.info("Retrieved JsonArray from Collector Config File");
-
- } catch (ParseException e) {
- errorLogger.error("ParseException occured at position:", e.getPosition());
- } catch (FileNotFoundException e) {
-
- errorLogger.error("Collector Config File is not found..", e.getMessage());
- } catch (IOException e) {
-
- errorLogger.error("Error occured due to :", e.getMessage());
- }
-
- return array;
-
- }
-
- public static String[] getProperyArray(String properyName, String defaultConfigFilelocation) {
- JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);
-
- String[] propertyArray = new String[jsonArray.size()];
-
- for (int k = 0; k < jsonArray.size(); k++) {
-
- JSONObject collJson = (JSONObject) jsonArray.get(k);
-
- propertyArray[k] = (String) collJson.get(properyName);
- }
- debugLogger.info("returning " + properyName + " array from Collector Config");
- return propertyArray;
-
- }
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2018-2019 TechMahindra
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.universalvesadapter.utils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ResourceUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Component
+public class CollectorConfigPropertyRetrival {
+
+
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+ private static JSONArray array;
+ @Autowired
+ private DmaapConfig dmaapConfig;
+
+ public static JSONArray collectorConfigArray(String configFile) {
+ try {
+ JSONParser parser = new JSONParser();
+ String content = readFile(configFile);
+ JSONObject obj = (JSONObject) parser.parse(content);
+ JSONObject appobj = (JSONObject) obj.get("app_preferences");
+ array = (JSONArray) appobj.get("collectors");
+
+ debugLogger.info("Retrieved JsonArray from Collector Config File");
+
+ } catch (ParseException e) {
+ errorLogger.error("ParseException occured at position:", e.getPosition());
+ }
+
+
+ return array;
+
+ }
+
+ public static String[] getProperyArray(String properyName,
+ String defaultConfigFilelocation) {
+ JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);
+
+ String[] propertyArray = new String[jsonArray.size()];
+
+ for (int k = 0; k < jsonArray.size(); k++) {
+
+ JSONObject collJson = (JSONObject) jsonArray.get(k);
+
+ propertyArray[k] = (String) collJson.get(properyName);
+ }
+ debugLogger.info("returning " + properyName + " array from Collector Config");
+ return propertyArray;
+
+ }
+
+ public Map<String, String> getDmaapTopics(String subscriber, String publisher,
+ String defaultConfigFilelocation) {
+ JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);
+
+ Map<String, String> dmaapTopics = new HashMap<>();
+
+ for (int k = 0; k < jsonArray.size(); k++) {
+
+ JSONObject collJson = (JSONObject) jsonArray.get(k);
+
+ dmaapTopics.put(collJson.get(subscriber).toString(),
+ collJson.get(publisher).toString());
+
+ }
+ debugLogger.info("returning Dmaap topics from Collector Config");
+ return dmaapTopics;
+
+ }
+
+ public Map<String, String> getTopics(String subscriber, String publisher,
+ String defaultConfigFilelocation) {
+ Map<String, String> dmaapTopics = new HashMap<>();
+
+ try {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String content = readFile(defaultConfigFilelocation);
+ // read JSON like DOM Parser
+ JsonNode rootNode = objectMapper.readTree(content);
+ JsonNode subscriberUrl = rootNode.path("streams_subscribes")
+ .path(subscriber).path("dmaap_info").path("topic_url");
+ JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher)
+ .path("dmaap_info").path("topic_url");
+
+ dmaapTopics.put(getTopicName(subscriberUrl.asText()),
+ getTopicName(publisherUrl.asText()));
+ setDmaapConfig(subscriberUrl.asText());
+ } catch (IOException ex) {
+ errorLogger.error("IOException occured:" + ex.getMessage());
+
+ } catch (URISyntaxException e) {
+
+ errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason());
+ }
+
+ return dmaapTopics;
+
+ }
+
+ public String getTopicName(String url) throws URISyntaxException {
+ URI uri = new URI(url);
+ String path = uri.getPath();
+ String idStr = path.substring(path.lastIndexOf('/') + 1);
+ return idStr;
+
+ }
+
+ public void setDmaapConfig(String url) throws URISyntaxException {
+ URI uri = new URI(url);
+ dmaapConfig.setDmaaphost(uri.getHost());
+ dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort());
+
+ }
+
+ public static String readFile(String configFileName) {
+ String content = null;
+ File file = null;
+
+ try {
+ file = ResourceUtils.getFile("classpath:" + configFileName);
+ content = new String(Files.readAllBytes(file.toPath()));
+ } catch (FileNotFoundException e) {
+ errorLogger.error("colud not find file :", configFileName);
+
+ } catch (IOException e) {
+ errorLogger.error("unable to read the file , reason:", e.getCause());
+ }
+
+ return content;
+
+ }
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
index 02b54c9..82c23b3 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java
@@ -1,287 +1,272 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-package org.onap.universalvesadapter.utils;
-
-import javax.validation.constraints.NotEmpty;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.PropertySource;
-import org.springframework.stereotype.Component;
-
-@Component
-@PropertySource(value = {"classpath:application.properties","classpath:mapper.properties"})
-@ConfigurationProperties
-public class DmaapConfig {
-
- // Hostname of DMaaP to be taken from ENV var
- @NotEmpty
- private String dmaaphost;
-
- // default port number to be taken from ENV var
- @NotEmpty
- private int DEFAULT_PORT_NUMBER;
-
- @Value("${mr.POLLING_INTERVAL}")
- @NotEmpty
- private int pollingInterval;
-
- // default to no username
- @Value("${mr.DEFAULT_USER_NAME}")
- private String DEFAULT_USER_NAME;
-
-
- //defaults to no userPassword
- @Value("${mr.DEFAULT_USER_PASSWORD}")
- private String DEFAULT_USER_PASSWORD;
-
- //defaults to using https protocol
- @Value("${mr.DEFAULT_PROTOCOL}")
- @NotEmpty
- private String DEFAULT_PROTOCOL;
-
- //defaults to json content type
- @Value("${mr.DEFAULT_CONTENT_TYPE}")
- @NotEmpty
- private String DEFAULT_CONTENT_TYPE;
-
- @Value("${mr.DMAAP_URI_PATH_PREFIX}")
- @NotEmpty
- private String DMAAP_URI_PATH_PREFIX;
-
- @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}")
- @NotEmpty
- private String DMAAP_DEFAULT_CONSUMER_ID;
-
- @Value("${mr.DMAAP_GROUP_PREFIX}")
- @NotEmpty
- private String DMAAP_GROUP_PREFIX;
-
- // Publisher Constants
-
- //Dmaap Publisher Topic
- @Value("${mr.publisher.topic}")
- @NotEmpty
- private String publisherTopic;
-
- //disable batching by default
- @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}")
- @NotEmpty
- private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
-
- //default recovery messages size
- @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}")
- @NotEmpty
- private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
-
-//number of retries when flushing messages
- @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
- @NotEmpty
- private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
-
- //delay in retrying for flushing messages
- @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}")
- @NotEmpty
- private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
-
- // Subscriber Constants
-
- @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
- @NotEmpty
- private int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
-
- @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
- @NotEmpty
- private int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
-
- @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
- @NotEmpty
- private String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
-
- @Value("${mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
- @NotEmpty
- private String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
-
- @Value("${mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
- @NotEmpty
- private String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
-
- public void setDmaaphost(String dmaaphost) {
- this.dmaaphost = dmaaphost;
- }
-
- public String getDmaaphost() {
- return dmaaphost;
- }
-
- public int getDEFAULT_PORT_NUMBER() {
- return DEFAULT_PORT_NUMBER;
- }
-
- public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) {
- DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER;
- }
-
- public String getDEFAULT_USER_NAME() {
- return DEFAULT_USER_NAME;
- }
-
- public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) {
- DEFAULT_USER_NAME = dEFAULT_USER_NAME;
- }
-
- public String getDEFAULT_USER_PASSWORD() {
- return DEFAULT_USER_PASSWORD;
- }
-
- public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) {
- DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD;
- }
-
- public String getDEFAULT_PROTOCOL() {
- return DEFAULT_PROTOCOL;
- }
-
- public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) {
- DEFAULT_PROTOCOL = dEFAULT_PROTOCOL;
- }
-
- public String getDEFAULT_CONTENT_TYPE() {
- return DEFAULT_CONTENT_TYPE;
- }
-
- public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) {
- DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE;
- }
-
- public String getDMAAP_URI_PATH_PREFIX() {
- return DMAAP_URI_PATH_PREFIX;
- }
-
- public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) {
- DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX;
- }
-
- public String getDMAAP_DEFAULT_CONSUMER_ID() {
- return DMAAP_DEFAULT_CONSUMER_ID;
- }
-
- public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) {
- DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID;
- }
-
- public String getDMAAP_GROUP_PREFIX() {
- return DMAAP_GROUP_PREFIX;
- }
-
- public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) {
- DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX;
- }
-
- public String getPublisherTopic() {
- return publisherTopic;
- }
-
- public void setPublisherTopic(String publisherTopic) {
- this.publisherTopic = publisherTopic;
- }
-
- public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() {
- return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
- }
-
- public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) {
- this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
- }
-
- public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() {
- return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
- }
-
- public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(
- int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) {
- this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
- }
-
- public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() {
- return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
- }
-
- public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) {
- this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
- }
-
- public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() {
- return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
- }
-
- public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) {
- this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
- }
-
- public int getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
- return subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
- }
-
- public void setsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
- this.subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
- }
-
- public int getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
- return subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
- }
-
- public void setsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
- this.subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
- }
-
- public String getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
- return subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
- }
-
- public void setsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
- this.subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
- }
-
- public String getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
- return subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
- }
-
- public void setsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
- this.subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
- }
-
- public String getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
- return subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
- }
-
- public void setsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
- this.subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
- }
-
- public int getPollingInterval() {
- return pollingInterval;
- }
-
- public void setPollingInterval(int pollingInterval) {
- this.pollingInterval = pollingInterval;
- }
-
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+package org.onap.universalvesadapter.utils;
+
+import javax.validation.constraints.NotEmpty;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.stereotype.Component;
+
+@Component
+@PropertySource(value = {"classpath:application.properties","classpath:mapper.properties"})
+@ConfigurationProperties
+public class DmaapConfig {
+
+ @NotEmpty
+ private String dmaaphost;
+
+ @NotEmpty
+ private int DEFAULT_PORT_NUMBER;
+
+ @Value("${mr.POLLING_INTERVAL}")
+ @NotEmpty
+ private int pollingInterval;
+
+ // default to no username
+ @Value("${mr.DEFAULT_USER_NAME}")
+ private String DEFAULT_USER_NAME;
+
+
+ //defaults to no userPassword
+ @Value("${mr.DEFAULT_USER_PASSWORD}")
+ private String DEFAULT_USER_PASSWORD;
+
+ //defaults to using https protocol
+ @Value("${mr.DEFAULT_PROTOCOL}")
+ @NotEmpty
+ private String DEFAULT_PROTOCOL;
+
+ //defaults to json content type
+ @Value("${mr.DEFAULT_CONTENT_TYPE}")
+ @NotEmpty
+ private String DEFAULT_CONTENT_TYPE;
+
+ @Value("${mr.DMAAP_URI_PATH_PREFIX}")
+ @NotEmpty
+ private String DMAAP_URI_PATH_PREFIX;
+
+ @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}")
+ @NotEmpty
+ private String DMAAP_DEFAULT_CONSUMER_ID;
+
+ @Value("${mr.DMAAP_GROUP_PREFIX}")
+ @NotEmpty
+ private String DMAAP_GROUP_PREFIX;
+
+ // Publisher Constants
+
+ //disable batching by default
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+
+ //default recovery messages size
+ @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}")
+ @NotEmpty
+ private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+
+//number of retries when flushing messages
+ @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+
+ //delay in retrying for flushing messages
+ @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}")
+ @NotEmpty
+ private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+
+ // Subscriber Constants
+
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}")
+ @NotEmpty
+ private int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}")
+ @NotEmpty
+ private int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+
+ @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}")
+ @NotEmpty
+ private String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+
+ @Value("${mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+
+ @Value("${mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}")
+ @NotEmpty
+ private String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+
+ public void setDmaaphost(String dmaaphost) {
+ this.dmaaphost = dmaaphost;
+ }
+
+ public String getDmaaphost() {
+ return dmaaphost;
+ }
+
+ public int getDEFAULT_PORT_NUMBER() {
+ return DEFAULT_PORT_NUMBER;
+ }
+
+ public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) {
+ DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER;
+ }
+
+ public String getDEFAULT_USER_NAME() {
+ return DEFAULT_USER_NAME;
+ }
+
+ public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) {
+ DEFAULT_USER_NAME = dEFAULT_USER_NAME;
+ }
+
+ public String getDEFAULT_USER_PASSWORD() {
+ return DEFAULT_USER_PASSWORD;
+ }
+
+ public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) {
+ DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD;
+ }
+
+ public String getDEFAULT_PROTOCOL() {
+ return DEFAULT_PROTOCOL;
+ }
+
+ public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) {
+ DEFAULT_PROTOCOL = dEFAULT_PROTOCOL;
+ }
+
+ public String getDEFAULT_CONTENT_TYPE() {
+ return DEFAULT_CONTENT_TYPE;
+ }
+
+ public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) {
+ DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE;
+ }
+
+ public String getDMAAP_URI_PATH_PREFIX() {
+ return DMAAP_URI_PATH_PREFIX;
+ }
+
+ public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) {
+ DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX;
+ }
+
+ public String getDMAAP_DEFAULT_CONSUMER_ID() {
+ return DMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) {
+ DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID;
+ }
+
+ public String getDMAAP_GROUP_PREFIX() {
+ return DMAAP_GROUP_PREFIX;
+ }
+
+ public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) {
+ DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE;
+ }
+
+ public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() {
+ return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(
+ int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) {
+ this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ }
+
+ public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE;
+ }
+
+ public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() {
+ return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) {
+ this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE;
+ }
+
+ public int getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() {
+ return subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public void setsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) {
+ this.subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS;
+ }
+
+ public int getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() {
+ return subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public void setsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) {
+ this.subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT;
+ }
+
+ public String getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() {
+ return subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public void setsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) {
+ this.subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX;
+ }
+
+ public String getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() {
+ return subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public void setsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) {
+ this.subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME;
+ }
+
+ public String getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() {
+ return subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+ public void setsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) {
+ this.subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME;
+ }
+
+ public int getPollingInterval() {
+ return pollingInterval;
+ }
+
+ public void setPollingInterval(int pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+
+}
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java
index af219ce..4bc66bb 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java
@@ -1,196 +1,223 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DCAE
-* ================================================================================
-* Copyright 2019 TechMahindra
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.universalvesadapter.utils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Map;
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-@Component
-public class FetchDynamicConfig {
-
- private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
- private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
-
- private static String url;
- public static String retString;
- public static String retCBSString;
- private static Map<String, String> env;
-
- public FetchDynamicConfig() {
- }
-
- public static void cbsCall(String configFile) {
-
- env = System.getenv();
- Boolean areEqual;
- // Call consul api and identify the CBS Service address and port
- getconsul();
- // Construct and invoke CBS API to get application Configuration
- getCBS();
- // Verify if data has changed
- areEqual = verifyConfigChange(configFile);
-
- if (!areEqual) {
- FetchDynamicConfig fc = new FetchDynamicConfig();
- fc.writefile(retCBSString,configFile);
- } else {
- debugLogger.info("New config pull results identical - " + configFile + " NOT refreshed");
- }
- }
-
-
- private static void getconsul() {
- url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
- retString = executecurl(url);
- debugLogger.info("CBS details fetched from Consul");
- }
-
- public static boolean verifyConfigChange(String configFile) {
-
- boolean areEqual = false;
- // Read current data
- try {
- File f = new File(configFile);
- if (f.exists() && !f.isDirectory()) {
-
- String jsonData = readFile(configFile);
- JSONObject jsonObject = new JSONObject(jsonData);
-
- ObjectMapper mapper = new ObjectMapper();
-
- JsonNode tree1 = mapper.readTree(jsonObject.toString());
- JsonNode tree2 = mapper.readTree(retCBSString);
- areEqual = tree1.equals(tree2);
- debugLogger.info("Comparison value:" + areEqual);
- } else {
- debugLogger.info("First time config file read: " + configFile);
- }
-
- } catch (IOException e) {
- errorLogger.error("Comparison with new fetched data failed" + e.getMessage());
-
- }
-
- return areEqual;
-
- }
-
- public static void getCBS() {
-
- // consul return as array
- JSONTokener temp = new JSONTokener(retString);
- JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
-
- String urlPart1 = null;
- if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
-
- urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
-
- }
- debugLogger.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
-
- if (env.containsKey("HOSTNAME")) {
- url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
- retCBSString = executecurl(url);
- } else if (env.containsKey("SERVICE_NAME")) {
- url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
- retCBSString = executecurl(url);
- } else {
- errorLogger.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
- }
-
- }
-
- public void writefile(String retCBSString, String configFile) {
- debugLogger.info("URL to fetch configuration:" + url);
-
- String indentedretstring = (new JSONObject(retCBSString)).toString(4);
-
- try (FileWriter file = new FileWriter(configFile)) {
- file.write(indentedretstring);
-
- debugLogger.info("Successfully Copied JSON Object to file " + configFile);
- } catch (IOException e) {
- errorLogger.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
- e.printStackTrace();
- }
-
- }
-
- public static String readFile(String filename) {
- String result = "";
- try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- }
- result = sb.toString();
- } catch (FileNotFoundException e) {
- errorLogger.error("colud not find file :",filename);
-
- } catch (Exception e) {
- errorLogger.error("unable to read the file , reason:",e.getCause());
- }
- return result;
- }
- private static String executecurl(String url) {
-
- String[] command = { "curl", "-v", url };
- ProcessBuilder process = new ProcessBuilder(command);
- Process p;
- String result = null;
- try {
- p = process.start();
- InputStreamReader ipr = new InputStreamReader(p.getInputStream());
- BufferedReader reader = new BufferedReader(ipr);
- StringBuilder builder = new StringBuilder();
- String line;
-
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- }
- result = builder.toString();
- reader.close();
- ipr.close();
- } catch (IOException e) {
- errorLogger.error("error", e);
- e.printStackTrace();
- }
- return result;
-
- }
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2019 TechMahindra
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.universalvesadapter.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ResourceUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Component
+public class FetchDynamicConfig {
+
+ private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
+ private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
+
+ private static String url;
+ public static String retString;
+ public static String retCBSString;
+ private static Map<String, String> env;
+
+ public FetchDynamicConfig() {}
+
+ public static void cbsCall(String configFile) {
+
+ env = System.getenv();
+ Boolean areEqual;
+ // Call consul api and identify the CBS Service address and port
+ getconsul();
+ // Construct and invoke CBS API to get application Configuration
+ getCBS();
+ // Verify if data has changed
+ areEqual = verifyConfigChange(configFile);
+
+ if (!areEqual) {
+ FetchDynamicConfig fc = new FetchDynamicConfig();
+ fc.writefile(retCBSString, configFile);
+ } else {
+ debugLogger.info("New config pull results identical - " + configFile
+ + " NOT refreshed");
+ }
+ }
+
+
+ private static void getconsul() {
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/"
+ + env.get("CONFIG_BINDING_SERVICE");
+ retString = executecurl(url);
+ debugLogger.info("CBS details fetched from Consul");
+ }
+
+ public static boolean verifyConfigChange(String configFile) {
+
+ boolean areEqual = false;
+ // Read current data
+ try {
+
+ File f = new File(
+ ClassLoader.getSystemResource(configFile.trim()).getFile());
+
+ if (f.exists() && !f.isDirectory()) {
+ debugLogger.info(
+ "Comparing local configuration with the configuration fethed from CBS ");
+
+ String jsonData = readFile(configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ JsonNode tree1 = mapper.readTree(jsonObject.toString());
+ JsonNode tree2 = mapper.readTree(retCBSString);
+ areEqual = tree1.equals(tree2);
+ debugLogger.info("Comparison value:" + areEqual);
+ } else {
+ debugLogger.info("First time config file read: " + configFile);
+ }
+
+ } catch (IOException e) {
+ errorLogger.error(
+ "Comparison with new fetched data failed" + e.getMessage());
+
+ }
+
+ return areEqual;
+
+ }
+
+ public static void getCBS() {
+
+ // consul return as array
+ JSONTokener temp = new JSONTokener(retString);
+ JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
+
+ String urlPart1 = null;
+ if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
+
+ urlPart1 = cbsjobj.getString("ServiceAddress") + ":"
+ + cbsjobj.getInt("ServicePort");
+
+ }
+ debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1);
+
+ if (env.containsKey("HOSTNAME")) {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retCBSString = executecurl(url);
+ debugLogger.info("Configuration fetched from CBS successfully..");
+ } else if (env.containsKey("SERVICE_NAME")) {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retCBSString = executecurl(url);
+ debugLogger.info("Configuration fetched from CBS successfully..");
+ } else {
+ errorLogger.error(
+ "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
+ }
+
+ }
+
+ public void writefile(String retCBSString, String configFile) {
+
+ String indentedretstring = (new JSONObject(retCBSString)).toString(4);
+ File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile());
+ try {
+ debugLogger.info("Overwriting local configuration file " + configFile
+ + " with configuartions received from CBS");
+
+
+ File file2 = ResourceUtils.getFile("classpath:" + configFile);
+ FileWriter fstream = new FileWriter(file2, false);
+ PrintWriter printWriter = new PrintWriter(fstream);
+ printWriter.print(indentedretstring);
+ printWriter.close();
+
+ debugLogger.info("New Config successfully written to local file to "
+ + configFile);
+ } catch (IOException e) {
+ errorLogger.error("Error in writing configuration into local KV file "
+ + configFile + retString + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ public static String readFile(String configFileName) {
+ String content = null;
+ File file = null;
+
+ try {
+ file = ResourceUtils.getFile("classpath:" + configFileName);
+ content = new String(Files.readAllBytes(file.toPath()));
+ } catch (FileNotFoundException e) {
+ errorLogger.error("colud not find file :", file.getName());
+
+ } catch (IOException e) {
+ errorLogger.error("unable to read the file , reason:", e.getCause());
+ } catch (Exception e) {
+ errorLogger.error("Exception occured , reason:", e.getMessage());
+ }
+
+ return content;
+
+ }
+
+ private static String executecurl(String url) {
+
+ String[] command = {"curl", "-v", url};
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ errorLogger.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
+
+ }
+
+}