From 8aa9abf52d06288025735d922d6f984c5b30d9e1 Mon Sep 17 00:00:00 2001 From: Pooja03 Date: Mon, 15 Apr 2019 20:40:26 +0530 Subject: Cloudify blueprint for VES mapper Adding Cloudify blueprint for VES mapper and some updates on code Change-Id: Idcbd9ec080717a80d04263a4baa6e4de9ce143c9 Issue-ID: DCAEGEN2-1176 Signed-off-by: Pooja03 --- .../domain/ves70/AlarmAdditionalInformation.java | 5 + .../adapter/UniversalEventAdapter.java | 303 +++++----- .../configs/DMaaPMRPublisherConfig.java | 505 ++++++++-------- .../configs/DMaaPMRSubscriberConfig.java | 638 ++++++++++----------- .../onap/universalvesadapter/dmaap/Creator.java | 178 +++--- .../service/VESAdapterInitializer.java | 552 +++++++++--------- .../universalvesadapter/service/VesService.java | 384 +++++++------ .../utils/CollectorConfigPropertyRetrival.java | 260 ++++++--- .../universalvesadapter/utils/DmaapConfig.java | 559 +++++++++--------- .../utils/FetchDynamicConfig.java | 419 +++++++------- .../src/main/resources/defaultSnmpMappingFile.xml | 60 +- UniversalVesAdapter/src/main/resources/kv.json | 46 +- UniversalVesAdapter/src/main/resources/kvTest.json | 15 - .../src/main/resources/mapper.properties | 82 +-- 14 files changed, 2077 insertions(+), 1929 deletions(-) delete mode 100644 UniversalVesAdapter/src/main/resources/kvTest.json (limited to 'UniversalVesAdapter/src/main') diff --git a/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java b/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java index afc63f8..b4760fd 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java +++ b/UniversalVesAdapter/src/main/java/org/onap/dcaegen2/ves/domain/ves70/AlarmAdditionalInformation.java @@ -58,6 +58,11 @@ public class AlarmAdditionalInformation { public void setAdditionalProperty(String name, String value) { this.additionalProperties.put(name, value); } + @JsonAnySetter + public void setAdditionalProperties(Map h) { + this.additionalProperties =h; + } + @Override public int hashCode() { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java index 09e85a0..483b19b 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java @@ -1,22 +1,23 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018-2019 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ +/*- + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2018-2019 TechMahindra + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + package org.onap.universalvesadapter.adapter; import java.io.ByteArrayInputStream; @@ -29,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PreDestroy; import org.milyn.Smooks; -import org.onap.dcaegen2.ves.domain.ves70.VesEvent; import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; import org.onap.universalvesadapter.exception.VesException; import org.onap.universalvesadapter.service.VESAdapterInitializer; @@ -58,128 +58,143 @@ import com.google.gson.JsonSyntaxException; @Component public class UniversalEventAdapter implements GenericAdapter { - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - - @Value("${defaultConfigFilelocation}") - private String defaultConfigFilelocation; - private String collectorIdentifierValue; - private String collectorIdentifierKey; - private Map eventToSmooksMapping = new ConcurrentHashMap<>(); - - public UniversalEventAdapter() { - - } - - /** - * transforms JSON to VES format and and returns the ves Event - * - * @param IncomingJason,eventType - * @return ves Event - */ - @Override - public String transform(String incomingJsonString) - throws ConfigFileSmooksConversionException, VesException { - String result = ""; - String configFileData; - - String identifier[]= CollectorConfigPropertyRetrival.getProperyArray("identifier",defaultConfigFilelocation ); - String defaultMappingFile="defaultMappingFile-"+Thread.currentThread().getName(); - try { - - Gson gson = new Gson(); - JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class); - - JsonElement results; - for(int i=0;i keys = object.keySet().iterator(); - while( keys.hasNext() ) { - String key = (String)keys.next(); - if ( object.get(key) instanceof JsonObject ) { - - jsonObject=(JsonObject) object.get(key); - JsonObject obj = keyObject(jsonObject, searchedKey); - exists = obj.has(searchedKey); - } - } - } - - return jsonObject; - } - + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + @Value("${defaultConfigFilelocation}") + private String defaultConfigFilelocation; + private String collectorIdentifierValue; + private String collectorIdentifierKey; + private Map eventToSmooksMapping = new ConcurrentHashMap<>(); + + public UniversalEventAdapter() { + + } + + /** + * transforms JSON to VES format and and returns the ves Event + * + * @param IncomingJason,eventType + * @return ves Event + */ + @Override + public String transform(String incomingJsonString) + throws ConfigFileSmooksConversionException, VesException { + String result = ""; + String configFileData; + + String identifier[] = CollectorConfigPropertyRetrival.getProperyArray("identifier", + defaultConfigFilelocation); + String defaultMappingFile = + "defaultMappingFile-" + Thread.currentThread().getName(); + try { + + Gson gson = new Gson(); + JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class); + + JsonElement results; + for (int i = 0; i < identifier.length; i++) { + JsonObject obj; + if ((obj = keyObject(body, identifier[i])).has(identifier[i])) { + collectorIdentifierKey = identifier[i]; + results = obj.get(identifier[i]); + collectorIdentifierValue = results.getAsString(); + + } + + } + // collectorIdentifierValue = collectorIdentifierValue.substring(0, + // collectorIdentifierValue.length() - 4); + if (collectorIdentifierKey.equals("notify OID")) { + collectorIdentifierValue = collectorIdentifierValue.substring(0, + collectorIdentifierValue.length() - 4); + } + + + if (VESAdapterInitializer.getMappingFiles() + .containsKey(collectorIdentifierValue)) { + configFileData = VESAdapterInitializer.getMappingFiles() + .get(collectorIdentifierValue); + debugLogger.debug( + "Using Mapping file as Mapping file is available for collector identifier:{}", + collectorIdentifierValue); + + } else { + + configFileData = VESAdapterInitializer.getMappingFiles() + .get(defaultMappingFile); + + debugLogger.debug( + "Using Default Mapping file as Mapping file is not available for Enterprise Id / identifer ID:{}", + collectorIdentifierValue); + } + + Smooks smooksTemp = new Smooks(new ByteArrayInputStream( + configFileData.getBytes(StandardCharsets.UTF_8))); + eventToSmooksMapping.put(collectorIdentifierKey, smooksTemp); + + Object vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp, + incomingJsonString); + debugLogger.info("Incoming json transformed to VES format successfully:" + + Thread.currentThread().getName()); + ObjectMapper objectMapper = new ObjectMapper(); + result = objectMapper.writeValueAsString(vesEvent); + debugLogger.info("Serialized VES json"); + } catch (JsonProcessingException exception) { + throw new VesException("Unable to convert pojo to VES format, Reason :{}", + exception); + } catch (SAXException | IOException exception) { + // Invalid Mapping file + exception.printStackTrace(); + errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, + exception.getMessage()); + + } catch (JsonSyntaxException exception) { + // Invalid Trap + errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}", + incomingJsonString, exception); + } catch (JsonParseException exception) { + // Invalid Trap + errorLogger.error("Dropping this Invalid json Trap :{}, Reason:{}", + incomingJsonString, exception); + } catch (RuntimeException exception) { + + exception.printStackTrace(); + errorLogger.error("Dropping this Trap :{},Reason:{}", incomingJsonString, + exception.getMessage()); + + } + return result; + } + + /** + * Closes all open smooks' instances before bean is destroyed + */ + @PreDestroy + public void destroy() { + for (Smooks smooks : eventToSmooksMapping.values()) + smooks.close(); + debugLogger.warn("All Smooks objects closed"); + } + + public JsonObject keyObject(JsonObject object, String searchedKey) { + boolean exists = object.has(searchedKey); + JsonObject jsonObject = object; + + if (!exists) { + Iterator keys = object.keySet().iterator(); + while (keys.hasNext()) { + String key = (String) keys.next(); + if (object.get(key) instanceof JsonObject) { + + jsonObject = (JsonObject) object.get(key); + JsonObject obj = keyObject(jsonObject, searchedKey); + exists = obj.has(searchedKey); + } + } + } + + return jsonObject; + } + } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java index 137e3af..4b15f55 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java @@ -1,252 +1,253 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - - -//TO-DO Lisence Issue ASK to Kedar??????????? -package org.onap.universalvesadapter.configs; - -import java.io.IOException; -import javax.annotation.Nonnull; -import org.onap.universalvesadapter.utils.DmaapConfig; -import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan; -import com.google.common.base.Objects; - -/** - *

- * Immutable DMaaP MR Configuration for DMaaP MR Publisher. - *

- * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber - * Configuration - *

- *

- * - * @author Rajiv Singla . Creation Date: 10/12/2016. - * - */ -@ComponentScan -public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig { - - - /** - * Publisher batching queue size - */ - private int maxBatchSize; - - /** - * Publisher Recovery Queue Size - */ - private int maxRecoveryQueueSize; - - /** - * Default uri path prefix - */ - private String dmaapUriPathPrefix ; - - private DMaaPMRPublisherConfig(Builder builder) { - this.hostName = builder.hostName; - this.portNumber = builder.portNumber; - this.topicName = builder.topicName; - this.protocol = builder.protocol; - this.userName = builder.userName; - this.userPassword = builder.userPassword; - this.contentType = builder.contentType; - this.maxBatchSize = builder.maxBatchSize; - this.maxRecoveryQueueSize = builder.maxRecoveryQueueSize; - this.dmaapUriPathPrefix = builder.dmaapUriPathPrefix; - } - - /** - * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object - */ - public static class Builder { - - private String hostName; - private Integer portNumber; - private String topicName; - private String userName; - private String userPassword; - private String protocol; - private String contentType; - private int maxBatchSize; - private int maxRecoveryQueueSize; - private String dmaapUriPathPrefix ; - - - - public Builder(@Nonnull String hostName, @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { - this.hostName = hostName; - this.topicName = topicName; - // Default values - this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); - this.userName = dmaapConfig.getDEFAULT_USER_NAME(); - this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); - this.protocol =dmaapConfig.getDEFAULT_PROTOCOL(); - this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); - - this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(); - this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(); - this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); - } - - /** - * Setup for custom host port number - Defaults to 80. - * - * @param portNumber - * custom port number - * @return Builder object itself for chaining - */ - public Builder setPortNumber(@Nonnull Integer portNumber) { - this.portNumber = portNumber; - return this; - } - - /** - * Setup user name for authentication. If no username is provided authentication - * will be disabled - * - * @param userName - * user name for DMaaP Topic Authentication - * @return Builder object itself for chaining - */ - public Builder setUserName(@Nonnull String userName) { - this.userName = userName; - return this; - } - - /** - * Setup user password for authentication. If no password is provided - * authentication will be disabled - * - * @param userPassword - * user password for DMaaP Topic Authentication - * @return Builder object itself for chaining - */ - public Builder setUserPassword(@Nonnull String userPassword) { - this.userPassword = userPassword; - return this; - } - - /** - * Setup custom Publisher protocol - Defaults to https. Note: Only http and - * https are currently supported. - * - * @param protocol - * protocol e.g. https - * @return Builder object itself for chaining - */ - public Builder setProtocol(@Nonnull String protocol) { - this.protocol = normalizeValidateProtocol(protocol); - return this; - } - - /** - * Setup custom Publisher content-type - Defaults to application/json - * - * @param contentType - * content type e.g. application/json - * @return Builder object itself for chaining - */ - public Builder setContentType(@Nonnull String contentType) { - final String normalizedContentType = normalizeValidateContentType(contentType); - this.contentType = normalizedContentType; - return this; - } - - /** - * Setup custom Publisher Max Batch Size - Defaults to 100 - * - * @param maxBatchSize - * max Batch Size - * @return Builder object itself for chaining - */ - public Builder setMaxBatchSize(int maxBatchSize) { - this.maxBatchSize = maxBatchSize; - return this; - } - - /** - * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold - * messages temporarily in case DMaaP MR Publisher topic is not responding for - * any reason. Defaults to 100,000 - * - * @param maxRecoveryQueueSize - * max recovery queue size - * @return Builder object itself for chaining - */ - public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) { - this.maxRecoveryQueueSize = maxRecoveryQueueSize; - return this; - } - - - /** - * Creates immutable instance of {@link DMaaPMRPublisherConfig} - * - * @return Builds and returns thread safe, immutable - * {@link DMaaPMRPublisherConfig} object - */ - public DMaaPMRPublisherConfig build() { - return new DMaaPMRPublisherConfig(this); - } - - } - public String getDmaapUriPathPrefix() { - return dmaapUriPathPrefix; - } - - /** - * Returns max Publisher Batch Queue Size - * - * @return max Publisher Batch Queue size - */ - public int getMaxBatchSize() { - return maxBatchSize; - } - - /** - * Returns max Publisher Recovery Queue Size - * - * @return max Recovery Queue size - */ - public int getMaxRecoveryQueueSize() { - return maxRecoveryQueueSize; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o; - return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize; - } - - @Override - public int hashCode() { - return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize); - } - -} +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + + +//TO-DO Lisence Issue ASK to Kedar??????????? +package org.onap.universalvesadapter.configs; + +import java.io.IOException; +import javax.annotation.Nonnull; +import org.onap.universalvesadapter.utils.DmaapConfig; +import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan; +import com.google.common.base.Objects; + +/** + *

+ * Immutable DMaaP MR Configuration for DMaaP MR Publisher. + *

+ * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber + * Configuration + *

+ *

+ * + * @author Rajiv Singla . Creation Date: 10/12/2016. + * + */ +@ComponentScan +public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig { + + + /** + * Publisher batching queue size + */ + private int maxBatchSize; + + /** + * Publisher Recovery Queue Size + */ + private int maxRecoveryQueueSize; + + /** + * Default uri path prefix + */ + private String dmaapUriPathPrefix ; + + private DMaaPMRPublisherConfig(Builder builder) { + this.hostName = builder.hostName; + this.portNumber = builder.portNumber; + this.topicName = builder.topicName; + this.protocol = builder.protocol; + this.userName = builder.userName; + this.userPassword = builder.userPassword; + this.contentType = builder.contentType; + this.maxBatchSize = builder.maxBatchSize; + this.maxRecoveryQueueSize = builder.maxRecoveryQueueSize; + this.dmaapUriPathPrefix = builder.dmaapUriPathPrefix; + } + + /** + * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private int maxBatchSize; + private int maxRecoveryQueueSize; + private String dmaapUriPathPrefix ; + + + + public Builder(@Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { +this.topicName = topicName; +this.hostName = dmaapConfig.getDmaaphost(); + this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); + // Default values + + this.userName = dmaapConfig.getDEFAULT_USER_NAME(); + this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); + this.protocol =dmaapConfig.getDEFAULT_PROTOCOL(); + this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); + + this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(); + this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(); + this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); + } + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber + * custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + /** + * Setup user name for authentication. If no username is provided authentication + * will be disabled + * + * @param userName + * user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + /** + * Setup user password for authentication. If no password is provided + * authentication will be disabled + * + * @param userPassword + * user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + /** + * Setup custom Publisher protocol - Defaults to https. Note: Only http and + * https are currently supported. + * + * @param protocol + * protocol e.g. https + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + /** + * Setup custom Publisher content-type - Defaults to application/json + * + * @param contentType + * content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + /** + * Setup custom Publisher Max Batch Size - Defaults to 100 + * + * @param maxBatchSize + * max Batch Size + * @return Builder object itself for chaining + */ + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + /** + * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold + * messages temporarily in case DMaaP MR Publisher topic is not responding for + * any reason. Defaults to 100,000 + * + * @param maxRecoveryQueueSize + * max recovery queue size + * @return Builder object itself for chaining + */ + public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) { + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + return this; + } + + + /** + * Creates immutable instance of {@link DMaaPMRPublisherConfig} + * + * @return Builds and returns thread safe, immutable + * {@link DMaaPMRPublisherConfig} object + */ + public DMaaPMRPublisherConfig build() { + return new DMaaPMRPublisherConfig(this); + } + + } + public String getDmaapUriPathPrefix() { + return dmaapUriPathPrefix; + } + + /** + * Returns max Publisher Batch Queue Size + * + * @return max Publisher Batch Queue size + */ + public int getMaxBatchSize() { + return maxBatchSize; + } + + /** + * Returns max Publisher Recovery Queue Size + * + * @return max Recovery Queue size + */ + public int getMaxRecoveryQueueSize() { + return maxRecoveryQueueSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o; + return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize); + } + +} \ No newline at end of file diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java index 703684f..a00091a 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java @@ -1,319 +1,319 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.onap.universalvesadapter.configs; - -import java.io.IOException; - -import javax.annotation.Nonnull; - -import org.onap.universalvesadapter.utils.DmaapConfig; - -import com.google.common.base.Objects; - -/** - *

- * Immutable DMaaP MR Configuration for Subscriber. - *

- * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration - *

- * - * @author Rajiv Singla . Creation Date: 10/12/2016. - */ -public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig { - - private final String consumerId; - private final String consumerGroup; - private final int timeoutMS; - private final int messageLimit; - private String timeoutMSParam; - private String messageLimitParam; - private String uriPrefix; - - public DMaaPMRSubscriberConfig(Builder builder) { - this.hostName = builder.hostName; - this.portNumber = builder.portNumber; - this.topicName = builder.topicName; - this.protocol = builder.protocol; - this.userName = builder.userName; - this.userPassword = builder.userPassword; - this.contentType = builder.contentType; - this.consumerId = builder.consumerId; - this.consumerGroup = builder.consumerGroup; - this.timeoutMS = builder.timeoutMS; - this.messageLimit = builder.messageLimit; - this.timeoutMSParam = builder.timeoutMSParam; - this.messageLimitParam = builder.messageLimitParam; - this.uriPrefix = builder.uriPreifix; - } - - /** - * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object - */ - public static class Builder { - - private String hostName; - private Integer portNumber; - private String topicName; - private String userName; - private String userPassword; - private String protocol; - private String contentType; - private String consumerId; - private String consumerGroup; - private int timeoutMS; - private int messageLimit; - private String timeoutMSParam; - private String messageLimitParam; - private String uriPreifix; - - - - public Builder(@Nonnull String hostName, - @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { - - // Required Values - this.hostName = hostName; - this.topicName = topicName; - // Default values - this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); - this.userName = dmaapConfig.getDEFAULT_USER_NAME(); - this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); - this.protocol = dmaapConfig.getDEFAULT_PROTOCOL(); - this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); - this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID(); - this.consumerGroup = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(); - this.timeoutMS =dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(); - this.messageLimit = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(); - this.timeoutMSParam=dmaapConfig.getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(); - this.messageLimitParam=dmaapConfig.getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(); - this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); - - } - - - /** - * Setup for custom host port number - Defaults to 80. - * - * @param portNumber custom port number - * @return Builder object itself for chaining - */ - public Builder setPortNumber(@Nonnull Integer portNumber) { - this.portNumber = portNumber; - return this; - } - - - /** - * Setup user name for authentication. If no username is provided authentication will be disabled - * - * @param userName user name for DMaaP Topic Authentication - * @return Builder object itself for chaining - */ - public Builder setUserName(@Nonnull String userName) { - this.userName = userName; - return this; - } - - - /** - * Setup user password for authentication. If no password is provided authentication will be disabled - * - * @param userPassword user password for DMaaP Topic Authentication - * @return Builder object itself for chaining - */ - public Builder setUserPassword(@Nonnull String userPassword) { - this.userPassword = userPassword; - return this; - } - - - /** - * Setup custom Subscriber protocol - Defaults to https. - * Note: Only http and https are currently supported. - * - * @param protocol protocol e.g. https or http - * @return Builder object itself for chaining - */ - public Builder setProtocol(@Nonnull String protocol) { - - this.protocol = normalizeValidateProtocol(protocol); - return this; - } - - /** - * Setup custom Subscriber content-type - Defaults to application/json - * - * @param contentType content type e.g. application/json - * @return Builder object itself for chaining - */ - public Builder setContentType(@Nonnull String contentType) { - final String normalizedContentType = normalizeValidateContentType(contentType); - this.contentType = normalizedContentType; - return this; - } - - - /** - * Setup custom Consumer Id - Defaults to random Id - * - * @param consumerId - custom consumer ID - * @return Builder object itself for chaining - */ - public Builder setConsumerId(@Nonnull String consumerId) { - this.consumerId = consumerId; - return this; - } - - /** - * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID - * - * @param consumerGroup - custom Consumer Group - * @return Builder object itself for chaining - */ - public Builder setConsumerGroup(@Nonnull String consumerGroup) { - this.consumerGroup = consumerGroup; - return this; - } - - /** - * Setup Custom Subscriber timeout in ms - Default to no timeout limit - * - * @param timeoutMS timeout in milliseconds - * @return Builder object itself for chaining - */ - public Builder setTimeoutMS(@Nonnull int timeoutMS) { - this.timeoutMS = timeoutMS; - return this; - } - - /** - * Setup custom Subscriber Message Limit - Default to no limit - * - * @param messageLimit message Limit - * @return Builder object itself for chaining - */ - public Builder setMessageLimit(@Nonnull int messageLimit) { - this.messageLimit = messageLimit; - return this; - } - - /** - * Builds Immutable instance of {@link DMaaPMRSubscriberConfig} - * - * @return immutable DMaaP Subscriber Config Object - */ - public DMaaPMRSubscriberConfig build() { - return new DMaaPMRSubscriberConfig(this); - } - - } - - - public String getTimeoutMSParam() { - return timeoutMSParam; - } - - public void setTimeoutMSParam(String timeoutMSParam) { - this.timeoutMSParam = timeoutMSParam; - } - - public String getMessageLimitParam() { - return messageLimitParam; - } - - public void setMessageLimitParam(String messageLimitParam) { - this.messageLimitParam = messageLimitParam; - } - - public String getUriPrefix() { - return uriPrefix; - } - - public void setUriPrefix(String uriPreifix) { - this.uriPrefix = uriPreifix; - } - - /** - * DMaaP MR Subscriber Consumer Id - * - * @return consumer Id - */ - public String getConsumerId() { - return consumerId; - } - - /** - * DMaaP MR Subscriber Consumer Group - * - * @return consumer group - */ - public String getConsumerGroup() { - return consumerGroup; - } - - /** - * DMaaP MR Subscriber Timeout in ms - * - * @return subscriber timeout ms - */ - public int getTimeoutMS() { - return timeoutMS; - } - - /** - * DMaaP MR Subscriber message limit - * - * @return subscriber message limit - */ - public int getMessageLimit() { - return messageLimit; - } - - /** - * DMaaP property file - * - * @return Properties - */ - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o; - return Objects.equal(consumerId, that.consumerId) && - Objects.equal(consumerGroup, that.consumerGroup) && - Objects.equal(timeoutMS, that.timeoutMS) && - Objects.equal(messageLimit, that.messageLimit); - } - - @Override - public int hashCode() { - return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit); - } - -} +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.configs; + +import java.io.IOException; + +import javax.annotation.Nonnull; + +import org.onap.universalvesadapter.utils.DmaapConfig; + +import com.google.common.base.Objects; + +/** + *

+ * Immutable DMaaP MR Configuration for Subscriber. + *

+ * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration + *

+ * + * @author Rajiv Singla . Creation Date: 10/12/2016. + */ +public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig { + + private final String consumerId; + private final String consumerGroup; + private final int timeoutMS; + private final int messageLimit; + private String timeoutMSParam; + private String messageLimitParam; + private String uriPrefix; + + public DMaaPMRSubscriberConfig(Builder builder) { + this.hostName = builder.hostName; + this.portNumber = builder.portNumber; + this.topicName = builder.topicName; + this.protocol = builder.protocol; + this.userName = builder.userName; + this.userPassword = builder.userPassword; + this.contentType = builder.contentType; + this.consumerId = builder.consumerId; + this.consumerGroup = builder.consumerGroup; + this.timeoutMS = builder.timeoutMS; + this.messageLimit = builder.messageLimit; + this.timeoutMSParam = builder.timeoutMSParam; + this.messageLimitParam = builder.messageLimitParam; + this.uriPrefix = builder.uriPreifix; + } + + /** + * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private String consumerId; + private String consumerGroup; + private int timeoutMS; + private int messageLimit; + private String timeoutMSParam; + private String messageLimitParam; + private String uriPreifix; + + + + public Builder(@Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { + + // Required Values + this.hostName = dmaapConfig.getDmaaphost(); + this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); + this.topicName = topicName; + // Default values + + this.userName = dmaapConfig.getDEFAULT_USER_NAME(); + this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); + this.protocol = dmaapConfig.getDEFAULT_PROTOCOL(); + this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); + this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID(); + this.consumerGroup = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(); + this.timeoutMS =dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(); + this.messageLimit = dmaapConfig.getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(); + this.timeoutMSParam=dmaapConfig.getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(); + this.messageLimitParam=dmaapConfig.getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(); + this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); + + } + + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + + /** + * Setup user name for authentication. If no username is provided authentication will be disabled + * + * @param userName user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + + /** + * Setup user password for authentication. If no password is provided authentication will be disabled + * + * @param userPassword user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + + /** + * Setup custom Subscriber protocol - Defaults to https. + * Note: Only http and https are currently supported. + * + * @param protocol protocol e.g. https or http + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + /** + * Setup custom Subscriber content-type - Defaults to application/json + * + * @param contentType content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + + /** + * Setup custom Consumer Id - Defaults to random Id + * + * @param consumerId - custom consumer ID + * @return Builder object itself for chaining + */ + public Builder setConsumerId(@Nonnull String consumerId) { + this.consumerId = consumerId; + return this; + } + + /** + * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID + * + * @param consumerGroup - custom Consumer Group + * @return Builder object itself for chaining + */ + public Builder setConsumerGroup(@Nonnull String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + /** + * Setup Custom Subscriber timeout in ms - Default to no timeout limit + * + * @param timeoutMS timeout in milliseconds + * @return Builder object itself for chaining + */ + public Builder setTimeoutMS(@Nonnull int timeoutMS) { + this.timeoutMS = timeoutMS; + return this; + } + + /** + * Setup custom Subscriber Message Limit - Default to no limit + * + * @param messageLimit message Limit + * @return Builder object itself for chaining + */ + public Builder setMessageLimit(@Nonnull int messageLimit) { + this.messageLimit = messageLimit; + return this; + } + + /** + * Builds Immutable instance of {@link DMaaPMRSubscriberConfig} + * + * @return immutable DMaaP Subscriber Config Object + */ + public DMaaPMRSubscriberConfig build() { + return new DMaaPMRSubscriberConfig(this); + } + + } + + + public String getTimeoutMSParam() { + return timeoutMSParam; + } + + public void setTimeoutMSParam(String timeoutMSParam) { + this.timeoutMSParam = timeoutMSParam; + } + + public String getMessageLimitParam() { + return messageLimitParam; + } + + public void setMessageLimitParam(String messageLimitParam) { + this.messageLimitParam = messageLimitParam; + } + + public String getUriPrefix() { + return uriPrefix; + } + + public void setUriPrefix(String uriPreifix) { + this.uriPrefix = uriPreifix; + } + + /** + * DMaaP MR Subscriber Consumer Id + * + * @return consumer Id + */ + public String getConsumerId() { + return consumerId; + } + + /** + * DMaaP MR Subscriber Consumer Group + * + * @return consumer group + */ + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * DMaaP MR Subscriber Timeout in ms + * + * @return subscriber timeout ms + */ + public int getTimeoutMS() { + return timeoutMS; + } + + /** + * DMaaP MR Subscriber message limit + * + * @return subscriber message limit + */ + public int getMessageLimit() { + return messageLimit; + } + + /** + * DMaaP property file + * + * @return Properties + */ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o; + return Objects.equal(consumerId, that.consumerId) && + Objects.equal(consumerGroup, that.consumerGroup) && + Objects.equal(timeoutMS, that.timeoutMS) && + Objects.equal(messageLimit, that.messageLimit); + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit); + } + +} \ No newline at end of file diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java index ce75a6e..1393e1b 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java @@ -1,91 +1,87 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.dmaap; - -import java.io.IOException; - -import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; -import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; -import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; -import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; -import org.onap.universalvesadapter.utils.DmaapConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class Creator { - - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - private DMaaPMRFactory dMaaPMRFactoryInstance; - private String dmaaphost; - private String publisherTopic; - @Autowired - private DmaapConfig dmaapConfig; - - // prop initializer - - public void propertyFileInitializer() { - - this.dmaaphost = dmaapConfig.getDmaaphost(); - this.publisherTopic = dmaapConfig.getPublisherTopic(); - this.dMaaPMRFactoryInstance = DMaaPMRFactory.create(); - debugLogger.info("The Hostname of DMaap is :" + dmaaphost); - - - } - - // publisher - public DMaaPMRPublisher getDMaaPMRPublisher(){ - propertyFileInitializer(); - DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null; - try { - dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(dmaaphost, publisherTopic,dmaapConfig).build(); - } catch (IOException e) { - errorLogger.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause()); - } - return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig); - } - - // subscriber - public DMaaPMRSubscriber getDMaaPMRSubscriber(String subcriberTopic){ - propertyFileInitializer(); - DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null; - try { - dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(dmaaphost, subcriberTopic, dmaapConfig).build(); - } catch (IOException e) { - - errorLogger.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause()); - } - - return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig); - - } - public void setDmaapConfig(DmaapConfig dmaapConfig) { - this.dmaapConfig = dmaapConfig; - } - - public DmaapConfig getDmaapConfig() { - return dmaapConfig; - } - -} +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.universalvesadapter.dmaap; + +import java.io.IOException; + +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.utils.DmaapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class Creator { + + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + private DMaaPMRFactory dMaaPMRFactoryInstance; + + + @Autowired + private DmaapConfig dmaapConfig; + + // prop initializer + + public void propertyFileInitializer() { + + this.dMaaPMRFactoryInstance = DMaaPMRFactory.create(); + + } + + // publisher + public DMaaPMRPublisher getDMaaPMRPublisher(String publisherTopic){ + propertyFileInitializer(); + DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null; + try { + dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(publisherTopic,dmaapConfig).build(); + } catch (IOException e) { + errorLogger.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause()); + } + return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig); + } + + // subscriber + public DMaaPMRSubscriber getDMaaPMRSubscriber(String subcriberTopic){ + propertyFileInitializer(); + DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null; + try { + dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(subcriberTopic, dmaapConfig).build(); + } catch (IOException e) { + + errorLogger.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause()); + } + + return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig); + + } + public void setDmaapConfig(DmaapConfig dmaapConfig) { + this.dmaapConfig = dmaapConfig; + } + + public DmaapConfig getDmaapConfig() { + return dmaapConfig; + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java index 671fcc3..f2adc9b 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java @@ -1,290 +1,262 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018-2019 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.service; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.onap.universalvesadapter.dmaap.Creator; -import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival; -import org.onap.universalvesadapter.utils.DmaapConfig; -import org.onap.universalvesadapter.utils.FetchDynamicConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ApplicationContext; -import org.springframework.core.Ordered; -import org.springframework.stereotype.Component; - -//AdapterInitializer -@Component -public class VESAdapterInitializer implements CommandLineRunner, Ordered { - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - - @Autowired - private Creator creator; - @Autowired - private DmaapConfig dmaapConfig; - @Value("${defaultConfigFilelocation}") - String defaultConfigFilelocation; - @Value("${server.port}") - String serverPort; - - private static Map mappingFiles = new HashMap(); - private static Map env; - - @Autowired - private ApplicationContext applicationContext; - - @Override - public void run(String... args) throws Exception { - debugLogger.info("The Default Config file Location:" + defaultConfigFilelocation.trim()); - - if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) { - errorLogger.error("Default Config file " + defaultConfigFilelocation.trim() + " is missing"); - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error("Application stoped due to missing default Config file"); - return -1; - })); - } - env = System.getenv(); - for (Map.Entry entry : env.entrySet()) { - debugLogger.debug(entry.getKey() + ":" + entry.getValue()); - } - - //checks for DMaaP Host and Port No - if( (env.get("DMAAPHOST")==null ||(env.get("MR_DEFAULT_PORT_NUMBER")==null))) { - - errorLogger.error("DMAAPHOST,MR_DEFAULT_PORT_NUMBER environment parameter is missing. Sample Usage is -\n sudo docker run -d -p 8085:8085/tcp --env MR_DEFAULT_PORT_NUMBER=3904 --env CONSUL_HOST=10.53.172.109 --env HOSTNAME=mvp-dcaegen2-service-mua --env CONFIG_BINDING_SERVICE=config_binding_service --env DMAAPHOST='10.53.172.156' nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest"); - - System.exit(SpringApplication.exit(applicationContext, () -> {errorLogger.error("Application stoped due missing DMAAPHOST or MR_DEFAULT_PORT_NUMBER environment varibales.Please refer above example for environment varibales to pass ");return-1;})); - } - - dmaapConfig.setDmaaphost(env.get("DMAAPHOST")); - dmaapConfig.setDEFAULT_PORT_NUMBER(Integer.parseInt(env.get("MR_DEFAULT_PORT_NUMBER"))); - creator.setDmaapConfig(dmaapConfig); - //check for consul details - if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) { - debugLogger.info(">>>Dynamic configuration to be used"); - FetchDynamicConfig.cbsCall(defaultConfigFilelocation); - - } else { - debugLogger.info(">>>Static configuration to be used"); - - } - readJsonToMap(defaultConfigFilelocation); - - //prepareDatabase(); - //fetchMappingFile(); - - debugLogger.info("Triggering controller's start url "); - executecurl("http://localhost:"+serverPort+"/start"); - } - - - private static String executecurl(String url) { - - debugLogger.info("Running curl command for url:{}",url); - String[] command = { "curl", "-v", url }; - ProcessBuilder process = new ProcessBuilder(command); - Process p; - String result = null; - try { - p = process.start(); - try(InputStreamReader ipr = new InputStreamReader(p.getInputStream()); - BufferedReader reader = new BufferedReader(ipr)){ - StringBuilder builder = new StringBuilder(); - String line; - - while ((line = reader.readLine()) != null) { - builder.append(line); - } - result = builder.toString(); - } - } catch (IOException e) { - errorLogger.error("error", e); - } - return result; - - } - - private void readJsonToMap(String configFile) { - try { - JSONArray collectorArray=CollectorConfigPropertyRetrival.collectorConfigArray(configFile); - - for (int i = 0; i < collectorArray.size(); i++) { - JSONObject obj2 = (JSONObject) collectorArray.get(i); - - if (obj2.containsKey("mapping-files")) { - - JSONArray a1 = (JSONArray) obj2.get("mapping-files"); - - for (int j = 0; j < a1.size(); j++) { - JSONObject obj3 = (JSONObject) a1.get(j); - Set> set = obj3.entrySet(); - - for (Entry entry : set) { - - mappingFiles.put(entry.getKey(), entry.getValue()); - } - } - - } - } - - } catch (Exception e) { - errorLogger.error("Exception occured while reading Collector config file cause: ",e.getCause()); - } - - } - - - /* private void prepareDatabase() throws IOException { - - - debugLogger.info("The Default Mapping file Location:" + defaultMappingFileLocation.trim()); - - if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) { - errorLogger.error( - "Default mapping file " + defaultMappingFileLocation.trim() + " is missing"); - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error("Application stoped due to missing default mapping file"); - return -1; - })); - } - - File file = new File( - ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile()); - - try (FileInputStream fileInputStream = new FileInputStream(file)) { - bytesArray = new byte[(int) file.length()]; - fileInputStream.read(bytesArray); - - } catch (IOException e1) { - errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " - + e1.getMessage(), e1); - // exit on missing default mapping file - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error("Application stoped due to missing default mapping file"); - return -1; - })); - } - - try (Connection con = DriverManager.getConnection(dBurl, user, pwd); - // creating table if not exist - PreparedStatement pstmt11 = - con.prepareStatement("CREATE TABLE IF NOT EXISTS public." - + MappingFileTableName + "\r\n" + "(\r\n" - + " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" - + " mappingfilecontents bytea,\r\n" - + " mimetype character varying COLLATE pg_catalog.\"default\",\r\n" - + " file_name character varying COLLATE pg_catalog.\"default\",\r\n" - + " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" - + ")\r\n" + "WITH (\r\n" + " OIDS = FALSE\r\n" + ")\r\n" - + "TABLESPACE pg_default;")) { - - metricsLogger.info("Postgresql Connection successful..."); - debugLogger.debug("Connection object:{}" , con.toString()); - - pstmt11.executeUpdate(); - debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully...."); - - if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) { - - try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " - + MappingFileTableName - + "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;")) { - pstmt.setString(1, defaultEnterpriseId); - pstmt.setBytes(2, bytesArray); - pstmt.setString(3, "text/xml"); - pstmt.setString(4, file.getName()); - - pstmt.executeUpdate(); - debugLogger.info("Made sure that default mapping file is present in table"); - } - } else { - errorLogger.error(file.getName() + " is empty"); - // exit on empty mapping file - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error("Application stoped beacuase default mapping file is empty.."); - return -1; - })); - } - - } catch (SQLException e) { - errorLogger.error("Received exception : " + e.getMessage(), e); - // exit on SqlException - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error("Application Stoped due to ", e.getCause()); - return -1; - })); - } - - }*/ - /*public void fetchMappingFile() { - - try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) { - debugLogger.info("Retrieving data from DB"); - PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file"); - ResultSet rs = pstmt.executeQuery(); - // parsing the column each time is a linear search - int column1Pos = rs.findColumn("enterpriseid"); - int column2Pos = rs.findColumn("mappingfilecontents"); - String hexString; - while (rs.next()) { - String column1 = rs.getString(column1Pos); - String column2 = rs.getString(column2Pos); - hexString = column2.substring(2); - byte[] bytes = Hex.decodeHex(hexString.toCharArray()); - String data = new String(bytes, "UTF-8"); - mappingFiles.put(column1, data); - } - debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + mappingFiles.size()); - } catch (Exception e) { - errorLogger.error("Error occured due to :" + e.getMessage()); - e.printStackTrace(); - } - - }*/ - - public static Map getMappingFiles() { - return mappingFiles; - } - - public static void setMappingFiles(Map mappingFiles) { - VESAdapterInitializer.mappingFiles = mappingFiles; - } - - @Override - public int getOrder() { - return 0; - } - -} +/*- + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2018-2019 TechMahindra + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.universalvesadapter.service; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival; +import org.onap.universalvesadapter.utils.FetchDynamicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ApplicationContext; +import org.springframework.core.Ordered; +import org.springframework.stereotype.Component; + +// AdapterInitializer +@Component +public class VESAdapterInitializer implements CommandLineRunner, Ordered { + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + @Value("${defaultConfigFilelocation}") + String defaultConfigFilelocation; + @Value("${server.port}") + String serverPort; + + private static Map mappingFiles = new HashMap(); + private static Map env; + + @Autowired + private ApplicationContext applicationContext; + + @Override + public void run(String... args) throws Exception { + debugLogger.info("The Default Config file Location:" + + defaultConfigFilelocation.trim()); + // final Path configFilePath = + // Paths.get(defaultConfigFilelocation.trim()).toAbsolutePath(); + // File f = new File(configFilePath.toString()); + + if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) { + errorLogger.error("Default Config file " + defaultConfigFilelocation.trim() + + " is missing"); + System.exit(SpringApplication.exit(applicationContext, () -> { + errorLogger.error( + "Application stoped due to missing default Config file"); + return -1; + })); + } + env = System.getenv(); + for (Map.Entry entry : env.entrySet()) { + debugLogger.debug(entry.getKey() + ":" + entry.getValue()); + } + + // check for consul details + if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") + && env.containsKey("HOSTNAME")) { + debugLogger.info(">>>Dynamic configuration to be used"); + FetchDynamicConfig.cbsCall(defaultConfigFilelocation); + + } else { + debugLogger.info(">>>Static configuration to be used"); + + } + readJsonToMap(defaultConfigFilelocation); + + // prepareDatabase(); + // fetchMappingFile(); + + debugLogger.info("Triggering controller's start url "); + executecurl("http://localhost:" + serverPort + "/start"); + } + + + private static String executecurl(String url) { + + debugLogger.info("Running curl command for url:{}", url); + String[] command = {"curl", "-v", url}; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + try (InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr)) { + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + } + } catch (IOException e) { + errorLogger.error("error", e); + } + return result; + + } + + private void readJsonToMap(String configFile) { + try { + JSONArray collectorArray = CollectorConfigPropertyRetrival + .collectorConfigArray(configFile); + + for (int i = 0; i < collectorArray.size(); i++) { + JSONObject obj2 = (JSONObject) collectorArray.get(i); + + if (obj2.containsKey("mapping-files")) { + + JSONArray a1 = (JSONArray) obj2.get("mapping-files"); + + for (int j = 0; j < a1.size(); j++) { + JSONObject obj3 = (JSONObject) a1.get(j); + Set> set = obj3.entrySet(); + + for (Entry entry : set) { + + mappingFiles.put(entry.getKey(), + entry.getValue()); + } + } + + } + } + + } catch (Exception e) { + e.printStackTrace(); + errorLogger.error( + "Exception occured while reading Collector config file cause: ", + e.getCause()); + } + + } + + + /* + * private void prepareDatabase() throws IOException { + * + * + * debugLogger.info("The Default Mapping file Location:" + + * defaultMappingFileLocation.trim()); + * + * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) { + * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() + + * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped due to missing default mapping file"); return -1; + * })); } + * + * File file = new File( + * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile()); + * + * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new + * byte[(int) file.length()]; fileInputStream.read(bytesArray); + * + * } catch (IOException e1) { + * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " + + * e1.getMessage(), e1); // exit on missing default mapping file + * System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped due to missing default mapping file"); return -1; + * })); } + * + * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if + * not exist PreparedStatement pstmt11 = + * con.prepareStatement("CREATE TABLE IF NOT EXISTS public." + MappingFileTableName + "\r\n" + * + "(\r\n" + + * " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" + + * " mappingfilecontents bytea,\r\n" + + * " mimetype character varying COLLATE pg_catalog.\"default\",\r\n" + + * " file_name character varying COLLATE pg_catalog.\"default\",\r\n" + + * " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" + + * "WITH (\r\n" + " OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) { + * + * metricsLogger.info("Postgresql Connection successful..."); + * debugLogger.debug("Connection object:{}" , con.toString()); + * + * pstmt11.executeUpdate(); + * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully...."); + * + * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) { + * + * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName + * + + * "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;" + * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray); + * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName()); + * + * pstmt.executeUpdate(); + * debugLogger.info("Made sure that default mapping file is present in table"); } } else { + * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file + * System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return + * -1; })); } + * + * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(), + * e); // exit on SqlException System.exit(SpringApplication.exit(applicationContext, () -> + * { errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); } + * + * } + */ + /* + * public void fetchMappingFile() { + * + * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) { + * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt = + * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery(); + * // parsing the column each time is a linear search int column1Pos = + * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents"); + * String hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String + * column2 = rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes = + * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8"); + * mappingFiles.put(column1, data); } + * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + + * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :" + * + e.getMessage()); e.printStackTrace(); } + * + * } + */ + + public static Map getMappingFiles() { + return mappingFiles; + } + + public static void setMappingFiles(Map mappingFiles) { + VESAdapterInitializer.mappingFiles = mappingFiles; + } + + @Override + public int getOrder() { + return 0; + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index a5b88ec..cbfeead 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -1,164 +1,220 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.service; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.onap.universalvesadapter.adapter.UniversalEventAdapter; -import org.onap.universalvesadapter.dmaap.Creator; -import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; -import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; -import org.onap.universalvesadapter.exception.DMaapException; -import org.onap.universalvesadapter.exception.MapperConfigException; -import org.onap.universalvesadapter.exception.VesException; -import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival; -import org.onap.universalvesadapter.utils.DmaapConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -/** - * Service that starts the universal ves adapter module to listen for events - * - * @author kmalbari - * - */ -@Component -public class VesService { - - private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger"); - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - - private boolean isRunning = true; - @Value("${defaultConfigFilelocation}") - private String defaultConfigFilelocation; - @Autowired - private Creator creator; - @Autowired - private UniversalEventAdapter eventAdapter; - @Autowired - private DmaapConfig dmaapConfig; - private static List list = new LinkedList(); - - - /** - * method triggers universal VES adapter module. - */ - public void start() throws MapperConfigException { - debugLogger.info("Creating Subcriber and Publisher with creator............."); - - - DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(); - - String topicArray[]= CollectorConfigPropertyRetrival.getProperyArray("subscriberTopic",defaultConfigFilelocation); - - - ExecutorService executorService=Executors.newFixedThreadPool(topicArray.length); - for(int i=0;i) list).removeFirst(); - List messages = new ArrayList<>(); - String vesEvent = processReceivedJson(val); - if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) { - messages.add(vesEvent); - publisher.publish(messages); - metricsLogger.info("Message successfully published to DMaaP Topic"); - } - - } - - } - } - - - - } - }); - } - - - - } - - /** - * method stops universal ves adapter module - */ - public void stop() { - isRunning = false; - } - - private String processReceivedJson(String incomingJsonString) { - String outgoingJsonString = null; - if (!"".equals(incomingJsonString)) { - - try { - - outgoingJsonString = eventAdapter.transform(incomingJsonString); - - } catch (VesException exception) { - errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception); - debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); - } catch (DMaapException e) { - errorLogger.error("Received exception : {}", e.getMessage()); - } - } - return outgoingJsonString; - } -} - +/*- + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2018-2019 TechMahindra + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.universalvesadapter.service; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.onap.universalvesadapter.adapter.UniversalEventAdapter; +import org.onap.universalvesadapter.dmaap.Creator; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.exception.MapperConfigException; +import org.onap.universalvesadapter.exception.VesException; +import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival; +import org.onap.universalvesadapter.utils.DmaapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Service that starts the universal ves adapter module to listen for events + * + * @author kmalbari + * + */ +/** + * @author PM00501616 + * + */ +/** + * @author PM00501616 + * + */ +@Component +public class VesService { + + private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger"); + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + private boolean isRunning = true; + @Value("${defaultConfigFilelocation}") + private String defaultConfigFilelocation; + @Autowired + private Creator creator; + @Autowired + private UniversalEventAdapter eventAdapter; + @Autowired + private DmaapConfig dmaapConfig; + @Autowired + private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival; + private static List list = new LinkedList(); + + + /** + * method triggers universal VES adapter module. + */ + public void start() throws MapperConfigException { + debugLogger.info("Creating Subcriber and Publisher with creator............."); + String topicName = null; + String publisherTopic = null; + // Hashmap of subscriber and publisher details in correspondence to the respective + // collectors in kv file + Map dmaapTopics = collectorConfigPropertyRetrival.getDmaapTopics( + "stream_subscriber", "stream_publisher", defaultConfigFilelocation); + + ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size()); + for (Map.Entry entry : dmaapTopics.entrySet()) { + String threadName = entry.getKey(); + // subcriber and corresponding publisher topics in a Map + Map subpubTopics = collectorConfigPropertyRetrival + .getTopics(entry.getKey(), entry.getValue(), + defaultConfigFilelocation); + for (Map.Entry entry2 : subpubTopics.entrySet()) { + topicName = entry2.getKey(); + publisherTopic = entry2.getValue(); + } + + + // Publisher and subcriber as per each collector + DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName); + + DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic); + debugLogger.info("Created scriber topic:" + topicName + "publisher topic:" + + publisherTopic); + + executorService.submit(new Runnable() { + + @Override + public void run() { + + Thread.currentThread().setName(threadName); + metricsLogger.info( + "fetch and publish from and to Dmaap started:" + + Thread.currentThread() + .getName()); + int pollingInternalInt = dmaapConfig.getPollingInterval(); + debugLogger.info( + "The Polling Interval in Milli Second is :{}" + + pollingInternalInt); + debugLogger.info( + "starting subscriber & publisher thread:{}", + Thread.currentThread().getName()); + while (true) { + synchronized (this) { + for (String incomingJsonString : subcriber + .fetchMessages() + .getFetchedMessages()) { + list.add(incomingJsonString); + + } + + if (list.isEmpty()) { + try { + Thread.sleep(pollingInternalInt); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + debugLogger.debug( + "number of messages to be converted :{}", + list.size()); + + if (!list.isEmpty()) { + String val = ((LinkedList) list) + .removeFirst(); + List messages = + new ArrayList<>(); + String vesEvent = + processReceivedJson( + val); + if (vesEvent != null && (!(vesEvent + .isEmpty() + || vesEvent.equals( + "")))) { + messages.add(vesEvent); + publisher.publish(messages); + + metricsLogger.info( + "Message successfully published to DMaaP Topic-\n" + + vesEvent); + } + + } + + } + } + + + + } + }); + } + + + + } + + /** + * method stops universal ves adapter module + */ + public void stop() { + isRunning = false; + } + + + /** + * method for processing the incoming json to ves + * + * @param incomingJsonString + * @return ves + */ + private String processReceivedJson(String incomingJsonString) { + String outgoingJsonString = null; + if (!"".equals(incomingJsonString)) { + + try { + + outgoingJsonString = eventAdapter.transform(incomingJsonString); + + } catch (VesException exception) { + errorLogger.error( + "Received exception : {},{}" + + exception.getMessage(), + exception); + debugLogger.warn( + "APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + } catch (DMaapException e) { + errorLogger.error("Received exception : {}", e.getMessage()); + } + } + return outgoingJsonString; + } +} + diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java index 2262b9b..afa5c7c 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java @@ -1,88 +1,172 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2019 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.utils; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; - -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -public class CollectorConfigPropertyRetrival { - - @Value("${defaultConfigFilelocation}") - public String defaultConfigFilelocation; - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - private static JSONArray array; - - public static JSONArray collectorConfigArray(String configFile) { - try { - JSONParser parser = new JSONParser(); - - File file = new File(ClassLoader.getSystemResource(configFile.trim()).getFile()); - - FileReader fileReader = new FileReader(file); - JSONObject obj = (JSONObject) parser.parse(fileReader); - JSONObject appobj = (JSONObject) obj.get("app_preferences"); - array = (JSONArray) appobj.get("collectors"); - - debugLogger.info("Retrieved JsonArray from Collector Config File"); - - } catch (ParseException e) { - errorLogger.error("ParseException occured at position:", e.getPosition()); - } catch (FileNotFoundException e) { - - errorLogger.error("Collector Config File is not found..", e.getMessage()); - } catch (IOException e) { - - errorLogger.error("Error occured due to :", e.getMessage()); - } - - return array; - - } - - public static String[] getProperyArray(String properyName, String defaultConfigFilelocation) { - JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); - - String[] propertyArray = new String[jsonArray.size()]; - - for (int k = 0; k < jsonArray.size(); k++) { - - JSONObject collJson = (JSONObject) jsonArray.get(k); - - propertyArray[k] = (String) collJson.get(properyName); - } - debugLogger.info("returning " + properyName + " array from Collector Config"); - return propertyArray; - - } - -} +/*- + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2018-2019 TechMahindra + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.universalvesadapter.utils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ResourceUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +@Component +public class CollectorConfigPropertyRetrival { + + + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + private static JSONArray array; + @Autowired + private DmaapConfig dmaapConfig; + + public static JSONArray collectorConfigArray(String configFile) { + try { + JSONParser parser = new JSONParser(); + String content = readFile(configFile); + JSONObject obj = (JSONObject) parser.parse(content); + JSONObject appobj = (JSONObject) obj.get("app_preferences"); + array = (JSONArray) appobj.get("collectors"); + + debugLogger.info("Retrieved JsonArray from Collector Config File"); + + } catch (ParseException e) { + errorLogger.error("ParseException occured at position:", e.getPosition()); + } + + + return array; + + } + + public static String[] getProperyArray(String properyName, + String defaultConfigFilelocation) { + JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); + + String[] propertyArray = new String[jsonArray.size()]; + + for (int k = 0; k < jsonArray.size(); k++) { + + JSONObject collJson = (JSONObject) jsonArray.get(k); + + propertyArray[k] = (String) collJson.get(properyName); + } + debugLogger.info("returning " + properyName + " array from Collector Config"); + return propertyArray; + + } + + public Map getDmaapTopics(String subscriber, String publisher, + String defaultConfigFilelocation) { + JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); + + Map dmaapTopics = new HashMap<>(); + + for (int k = 0; k < jsonArray.size(); k++) { + + JSONObject collJson = (JSONObject) jsonArray.get(k); + + dmaapTopics.put(collJson.get(subscriber).toString(), + collJson.get(publisher).toString()); + + } + debugLogger.info("returning Dmaap topics from Collector Config"); + return dmaapTopics; + + } + + public Map getTopics(String subscriber, String publisher, + String defaultConfigFilelocation) { + Map dmaapTopics = new HashMap<>(); + + try { + + ObjectMapper objectMapper = new ObjectMapper(); + String content = readFile(defaultConfigFilelocation); + // read JSON like DOM Parser + JsonNode rootNode = objectMapper.readTree(content); + JsonNode subscriberUrl = rootNode.path("streams_subscribes") + .path(subscriber).path("dmaap_info").path("topic_url"); + JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher) + .path("dmaap_info").path("topic_url"); + + dmaapTopics.put(getTopicName(subscriberUrl.asText()), + getTopicName(publisherUrl.asText())); + setDmaapConfig(subscriberUrl.asText()); + } catch (IOException ex) { + errorLogger.error("IOException occured:" + ex.getMessage()); + + } catch (URISyntaxException e) { + + errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason()); + } + + return dmaapTopics; + + } + + public String getTopicName(String url) throws URISyntaxException { + URI uri = new URI(url); + String path = uri.getPath(); + String idStr = path.substring(path.lastIndexOf('/') + 1); + return idStr; + + } + + public void setDmaapConfig(String url) throws URISyntaxException { + URI uri = new URI(url); + dmaapConfig.setDmaaphost(uri.getHost()); + dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort()); + + } + + public static String readFile(String configFileName) { + String content = null; + File file = null; + + try { + file = ResourceUtils.getFile("classpath:" + configFileName); + content = new String(Files.readAllBytes(file.toPath())); + } catch (FileNotFoundException e) { + errorLogger.error("colud not find file :", configFileName); + + } catch (IOException e) { + errorLogger.error("unable to read the file , reason:", e.getCause()); + } + + return content; + + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java index 02b54c9..82c23b3 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java @@ -1,287 +1,272 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ -package org.onap.universalvesadapter.utils; - -import javax.validation.constraints.NotEmpty; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.PropertySource; -import org.springframework.stereotype.Component; - -@Component -@PropertySource(value = {"classpath:application.properties","classpath:mapper.properties"}) -@ConfigurationProperties -public class DmaapConfig { - - // Hostname of DMaaP to be taken from ENV var - @NotEmpty - private String dmaaphost; - - // default port number to be taken from ENV var - @NotEmpty - private int DEFAULT_PORT_NUMBER; - - @Value("${mr.POLLING_INTERVAL}") - @NotEmpty - private int pollingInterval; - - // default to no username - @Value("${mr.DEFAULT_USER_NAME}") - private String DEFAULT_USER_NAME; - - - //defaults to no userPassword - @Value("${mr.DEFAULT_USER_PASSWORD}") - private String DEFAULT_USER_PASSWORD; - - //defaults to using https protocol - @Value("${mr.DEFAULT_PROTOCOL}") - @NotEmpty - private String DEFAULT_PROTOCOL; - - //defaults to json content type - @Value("${mr.DEFAULT_CONTENT_TYPE}") - @NotEmpty - private String DEFAULT_CONTENT_TYPE; - - @Value("${mr.DMAAP_URI_PATH_PREFIX}") - @NotEmpty - private String DMAAP_URI_PATH_PREFIX; - - @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}") - @NotEmpty - private String DMAAP_DEFAULT_CONSUMER_ID; - - @Value("${mr.DMAAP_GROUP_PREFIX}") - @NotEmpty - private String DMAAP_GROUP_PREFIX; - - // Publisher Constants - - //Dmaap Publisher Topic - @Value("${mr.publisher.topic}") - @NotEmpty - private String publisherTopic; - - //disable batching by default - @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}") - @NotEmpty - private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; - - //default recovery messages size - @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}") - @NotEmpty - private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; - -//number of retries when flushing messages - @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}") - @NotEmpty - private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; - - //delay in retrying for flushing messages - @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}") - @NotEmpty - private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; - - // Subscriber Constants - - @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}") - @NotEmpty - private int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; - - @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}") - @NotEmpty - private int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; - - @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}") - @NotEmpty - private String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; - - @Value("${mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}") - @NotEmpty - private String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; - - @Value("${mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}") - @NotEmpty - private String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; - - public void setDmaaphost(String dmaaphost) { - this.dmaaphost = dmaaphost; - } - - public String getDmaaphost() { - return dmaaphost; - } - - public int getDEFAULT_PORT_NUMBER() { - return DEFAULT_PORT_NUMBER; - } - - public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) { - DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER; - } - - public String getDEFAULT_USER_NAME() { - return DEFAULT_USER_NAME; - } - - public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) { - DEFAULT_USER_NAME = dEFAULT_USER_NAME; - } - - public String getDEFAULT_USER_PASSWORD() { - return DEFAULT_USER_PASSWORD; - } - - public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) { - DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD; - } - - public String getDEFAULT_PROTOCOL() { - return DEFAULT_PROTOCOL; - } - - public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) { - DEFAULT_PROTOCOL = dEFAULT_PROTOCOL; - } - - public String getDEFAULT_CONTENT_TYPE() { - return DEFAULT_CONTENT_TYPE; - } - - public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) { - DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE; - } - - public String getDMAAP_URI_PATH_PREFIX() { - return DMAAP_URI_PATH_PREFIX; - } - - public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) { - DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX; - } - - public String getDMAAP_DEFAULT_CONSUMER_ID() { - return DMAAP_DEFAULT_CONSUMER_ID; - } - - public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) { - DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID; - } - - public String getDMAAP_GROUP_PREFIX() { - return DMAAP_GROUP_PREFIX; - } - - public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) { - DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX; - } - - public String getPublisherTopic() { - return publisherTopic; - } - - public void setPublisherTopic(String publisherTopic) { - this.publisherTopic = publisherTopic; - } - - public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() { - return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; - } - - public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) { - this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; - } - - public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() { - return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; - } - - public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE( - int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) { - this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; - } - - public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() { - return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; - } - - public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) { - this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; - } - - public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() { - return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; - } - - public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) { - this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; - } - - public int getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() { - return subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; - } - - public void setsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) { - this.subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; - } - - public int getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() { - return subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; - } - - public void setsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) { - this.subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; - } - - public String getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() { - return subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; - } - - public void setsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) { - this.subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; - } - - public String getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() { - return subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; - } - - public void setsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) { - this.subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; - } - - public String getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() { - return subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; - } - - public void setsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) { - this.subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; - } - - public int getPollingInterval() { - return pollingInterval; - } - - public void setPollingInterval(int pollingInterval) { - this.pollingInterval = pollingInterval; - } - - -} +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ +package org.onap.universalvesadapter.utils; + +import javax.validation.constraints.NotEmpty; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Component; + +@Component +@PropertySource(value = {"classpath:application.properties","classpath:mapper.properties"}) +@ConfigurationProperties +public class DmaapConfig { + + @NotEmpty + private String dmaaphost; + + @NotEmpty + private int DEFAULT_PORT_NUMBER; + + @Value("${mr.POLLING_INTERVAL}") + @NotEmpty + private int pollingInterval; + + // default to no username + @Value("${mr.DEFAULT_USER_NAME}") + private String DEFAULT_USER_NAME; + + + //defaults to no userPassword + @Value("${mr.DEFAULT_USER_PASSWORD}") + private String DEFAULT_USER_PASSWORD; + + //defaults to using https protocol + @Value("${mr.DEFAULT_PROTOCOL}") + @NotEmpty + private String DEFAULT_PROTOCOL; + + //defaults to json content type + @Value("${mr.DEFAULT_CONTENT_TYPE}") + @NotEmpty + private String DEFAULT_CONTENT_TYPE; + + @Value("${mr.DMAAP_URI_PATH_PREFIX}") + @NotEmpty + private String DMAAP_URI_PATH_PREFIX; + + @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}") + @NotEmpty + private String DMAAP_DEFAULT_CONSUMER_ID; + + @Value("${mr.DMAAP_GROUP_PREFIX}") + @NotEmpty + private String DMAAP_GROUP_PREFIX; + + // Publisher Constants + + //disable batching by default + @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}") + @NotEmpty + private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + + //default recovery messages size + @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}") + @NotEmpty + private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + +//number of retries when flushing messages + @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}") + @NotEmpty + private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + + //delay in retrying for flushing messages + @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}") + @NotEmpty + private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + + // Subscriber Constants + + @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}") + @NotEmpty + private int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + + @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}") + @NotEmpty + private int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + + @Value("${mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}") + @NotEmpty + private String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + + @Value("${mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}") + @NotEmpty + private String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + + @Value("${mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}") + @NotEmpty + private String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + + public void setDmaaphost(String dmaaphost) { + this.dmaaphost = dmaaphost; + } + + public String getDmaaphost() { + return dmaaphost; + } + + public int getDEFAULT_PORT_NUMBER() { + return DEFAULT_PORT_NUMBER; + } + + public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) { + DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER; + } + + public String getDEFAULT_USER_NAME() { + return DEFAULT_USER_NAME; + } + + public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) { + DEFAULT_USER_NAME = dEFAULT_USER_NAME; + } + + public String getDEFAULT_USER_PASSWORD() { + return DEFAULT_USER_PASSWORD; + } + + public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) { + DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD; + } + + public String getDEFAULT_PROTOCOL() { + return DEFAULT_PROTOCOL; + } + + public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) { + DEFAULT_PROTOCOL = dEFAULT_PROTOCOL; + } + + public String getDEFAULT_CONTENT_TYPE() { + return DEFAULT_CONTENT_TYPE; + } + + public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) { + DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE; + } + + public String getDMAAP_URI_PATH_PREFIX() { + return DMAAP_URI_PATH_PREFIX; + } + + public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) { + DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX; + } + + public String getDMAAP_DEFAULT_CONSUMER_ID() { + return DMAAP_DEFAULT_CONSUMER_ID; + } + + public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) { + DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID; + } + + public String getDMAAP_GROUP_PREFIX() { + return DMAAP_GROUP_PREFIX; + } + + public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) { + DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX; + } + + public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() { + return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + } + + public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) { + this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + } + + public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() { + return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + } + + public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE( + int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) { + this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + } + + public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() { + return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + } + + public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) { + this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + } + + public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() { + return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + } + + public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) { + this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + } + + public int getsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() { + return subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + } + + public void setsubscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) { + this.subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subscriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + } + + public int getsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() { + return subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + } + + public void setsubscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) { + this.subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subscriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + } + + public String getsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() { + return subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + } + + public void setsubscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) { + this.subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subscriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + } + + public String getsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() { + return subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + } + + public void setsubscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) { + this.subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + } + + public String getsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() { + return subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + } + + public void setsubscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) { + this.subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subscriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + } + + public int getPollingInterval() { + return pollingInterval; + } + + public void setPollingInterval(int pollingInterval) { + this.pollingInterval = pollingInterval; + } + + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java index af219ce..4bc66bb 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java @@ -1,196 +1,223 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2019 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.utils; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Map; -import org.json.JSONArray; -import org.json.JSONObject; -import org.json.JSONTokener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -@Component -public class FetchDynamicConfig { - - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - - private static String url; - public static String retString; - public static String retCBSString; - private static Map env; - - public FetchDynamicConfig() { - } - - public static void cbsCall(String configFile) { - - env = System.getenv(); - Boolean areEqual; - // Call consul api and identify the CBS Service address and port - getconsul(); - // Construct and invoke CBS API to get application Configuration - getCBS(); - // Verify if data has changed - areEqual = verifyConfigChange(configFile); - - if (!areEqual) { - FetchDynamicConfig fc = new FetchDynamicConfig(); - fc.writefile(retCBSString,configFile); - } else { - debugLogger.info("New config pull results identical - " + configFile + " NOT refreshed"); - } - } - - - private static void getconsul() { - url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE"); - retString = executecurl(url); - debugLogger.info("CBS details fetched from Consul"); - } - - public static boolean verifyConfigChange(String configFile) { - - boolean areEqual = false; - // Read current data - try { - File f = new File(configFile); - if (f.exists() && !f.isDirectory()) { - - String jsonData = readFile(configFile); - JSONObject jsonObject = new JSONObject(jsonData); - - ObjectMapper mapper = new ObjectMapper(); - - JsonNode tree1 = mapper.readTree(jsonObject.toString()); - JsonNode tree2 = mapper.readTree(retCBSString); - areEqual = tree1.equals(tree2); - debugLogger.info("Comparison value:" + areEqual); - } else { - debugLogger.info("First time config file read: " + configFile); - } - - } catch (IOException e) { - errorLogger.error("Comparison with new fetched data failed" + e.getMessage()); - - } - - return areEqual; - - } - - public static void getCBS() { - - // consul return as array - JSONTokener temp = new JSONTokener(retString); - JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0); - - String urlPart1 = null; - if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) { - - urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort"); - - } - debugLogger.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1); - - if (env.containsKey("HOSTNAME")) { - url = urlPart1 + "/service_component/" + env.get("HOSTNAME"); - retCBSString = executecurl(url); - } else if (env.containsKey("SERVICE_NAME")) { - url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME"); - retCBSString = executecurl(url); - } else { - errorLogger.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container "); - } - - } - - public void writefile(String retCBSString, String configFile) { - debugLogger.info("URL to fetch configuration:" + url); - - String indentedretstring = (new JSONObject(retCBSString)).toString(4); - - try (FileWriter file = new FileWriter(configFile)) { - file.write(indentedretstring); - - debugLogger.info("Successfully Copied JSON Object to file " + configFile); - } catch (IOException e) { - errorLogger.error("Error in writing configuration into file " + configFile + retString + e.getMessage()); - e.printStackTrace(); - } - - } - - public static String readFile(String filename) { - String result = ""; - try (BufferedReader br = new BufferedReader(new FileReader(filename))) { - StringBuilder sb = new StringBuilder(); - String line = br.readLine(); - while (line != null) { - sb.append(line); - line = br.readLine(); - } - result = sb.toString(); - } catch (FileNotFoundException e) { - errorLogger.error("colud not find file :",filename); - - } catch (Exception e) { - errorLogger.error("unable to read the file , reason:",e.getCause()); - } - return result; - } - private static String executecurl(String url) { - - String[] command = { "curl", "-v", url }; - ProcessBuilder process = new ProcessBuilder(command); - Process p; - String result = null; - try { - p = process.start(); - InputStreamReader ipr = new InputStreamReader(p.getInputStream()); - BufferedReader reader = new BufferedReader(ipr); - StringBuilder builder = new StringBuilder(); - String line; - - while ((line = reader.readLine()) != null) { - builder.append(line); - } - result = builder.toString(); - reader.close(); - ipr.close(); - } catch (IOException e) { - errorLogger.error("error", e); - e.printStackTrace(); - } - return result; - - } - -} +/*- + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2019 TechMahindra + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.universalvesadapter.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.util.Map; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.util.ResourceUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +@Component +public class FetchDynamicConfig { + + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + private static String url; + public static String retString; + public static String retCBSString; + private static Map env; + + public FetchDynamicConfig() {} + + public static void cbsCall(String configFile) { + + env = System.getenv(); + Boolean areEqual; + // Call consul api and identify the CBS Service address and port + getconsul(); + // Construct and invoke CBS API to get application Configuration + getCBS(); + // Verify if data has changed + areEqual = verifyConfigChange(configFile); + + if (!areEqual) { + FetchDynamicConfig fc = new FetchDynamicConfig(); + fc.writefile(retCBSString, configFile); + } else { + debugLogger.info("New config pull results identical - " + configFile + + " NOT refreshed"); + } + } + + + private static void getconsul() { + url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + + env.get("CONFIG_BINDING_SERVICE"); + retString = executecurl(url); + debugLogger.info("CBS details fetched from Consul"); + } + + public static boolean verifyConfigChange(String configFile) { + + boolean areEqual = false; + // Read current data + try { + + File f = new File( + ClassLoader.getSystemResource(configFile.trim()).getFile()); + + if (f.exists() && !f.isDirectory()) { + debugLogger.info( + "Comparing local configuration with the configuration fethed from CBS "); + + String jsonData = readFile(configFile); + JSONObject jsonObject = new JSONObject(jsonData); + + ObjectMapper mapper = new ObjectMapper(); + + JsonNode tree1 = mapper.readTree(jsonObject.toString()); + JsonNode tree2 = mapper.readTree(retCBSString); + areEqual = tree1.equals(tree2); + debugLogger.info("Comparison value:" + areEqual); + } else { + debugLogger.info("First time config file read: " + configFile); + } + + } catch (IOException e) { + errorLogger.error( + "Comparison with new fetched data failed" + e.getMessage()); + + } + + return areEqual; + + } + + public static void getCBS() { + + // consul return as array + JSONTokener temp = new JSONTokener(retString); + JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0); + + String urlPart1 = null; + if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) { + + urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + + cbsjobj.getInt("ServicePort"); + + } + debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1); + + if (env.containsKey("HOSTNAME")) { + url = urlPart1 + "/service_component/" + env.get("HOSTNAME"); + retCBSString = executecurl(url); + debugLogger.info("Configuration fetched from CBS successfully.."); + } else if (env.containsKey("SERVICE_NAME")) { + url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME"); + retCBSString = executecurl(url); + debugLogger.info("Configuration fetched from CBS successfully.."); + } else { + errorLogger.error( + "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container "); + } + + } + + public void writefile(String retCBSString, String configFile) { + + String indentedretstring = (new JSONObject(retCBSString)).toString(4); + File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile()); + try { + debugLogger.info("Overwriting local configuration file " + configFile + + " with configuartions received from CBS"); + + + File file2 = ResourceUtils.getFile("classpath:" + configFile); + FileWriter fstream = new FileWriter(file2, false); + PrintWriter printWriter = new PrintWriter(fstream); + printWriter.print(indentedretstring); + printWriter.close(); + + debugLogger.info("New Config successfully written to local file to " + + configFile); + } catch (IOException e) { + errorLogger.error("Error in writing configuration into local KV file " + + configFile + retString + e.getMessage()); + e.printStackTrace(); + } + + } + + public static String readFile(String configFileName) { + String content = null; + File file = null; + + try { + file = ResourceUtils.getFile("classpath:" + configFileName); + content = new String(Files.readAllBytes(file.toPath())); + } catch (FileNotFoundException e) { + errorLogger.error("colud not find file :", file.getName()); + + } catch (IOException e) { + errorLogger.error("unable to read the file , reason:", e.getCause()); + } catch (Exception e) { + errorLogger.error("Exception occured , reason:", e.getMessage()); + } + + return content; + + } + + private static String executecurl(String url) { + + String[] command = {"curl", "-v", url}; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr); + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + reader.close(); + ipr.close(); + } catch (IOException e) { + errorLogger.error("error", e); + e.printStackTrace(); + } + return result; + + } + +} diff --git a/UniversalVesAdapter/src/main/resources/defaultSnmpMappingFile.xml b/UniversalVesAdapter/src/main/resources/defaultSnmpMappingFile.xml index 5f31fab..4263137 100644 --- a/UniversalVesAdapter/src/main/resources/defaultSnmpMappingFile.xml +++ b/UniversalVesAdapter/src/main/resources/defaultSnmpMappingFile.xml @@ -1,51 +1,43 @@ - - + - + - + - - "3.0" + + '3.0' 'FaultField' 'XXXX' 'VESMapper' - org.onap.dcaegen2.ves.domain.ves5_4.CommonEventHeader.Domain.FAULT + org.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Domain.FAULT commonEventHeader.domain - - - org.onap.dcaegen2.ves.domain.ves5_4.CommonEventHeader.Priority.NORMAL - "VesAdapter" - - - - - - "SNMP Fault" - org.onap.dcaegen2.ves.domain.ves5_4.FaultFields.VfStatus.ACTIVE - - org.onap.dcaegen2.ves.domain.ves5_4.FaultFields.EventSeverity.MINOR - - - - - - - - - - - - - - + + + org.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Priority.NORMAL + 'VesAdapter' + + + + + 'SNMP Fault' + org.onap.dcaegen2.ves.domain.ves54.FaultFields.VfStatus.ACTIVE + org.onap.dcaegen2.ves.domain.ves54.FaultFields.EventSeverity.MINOR + + + + + + + + + \ No newline at end of file diff --git a/UniversalVesAdapter/src/main/resources/kv.json b/UniversalVesAdapter/src/main/resources/kv.json index 9e245d1..03c87e9 100644 --- a/UniversalVesAdapter/src/main/resources/kv.json +++ b/UniversalVesAdapter/src/main/resources/kv.json @@ -2,23 +2,53 @@ "app_preferences":{ "collectors":[ { - "identifier":"notification-id", - "subscriberTopic":"ONAP-COLLECTOR-RESTCONFTRAP", + "stream_subscriber":"rcc-notification", "mapping-files":[ { - "defaultMappingFile-ONAP-COLLECTOR-RESTCONFTRAP":"<\/json:keyMap><\/json:reader><\/jb:bean><\/jb:bean>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1<\/jb:expression>'pnfRegistration'<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1<\/jb:expression>'registration_'+commonEventHeader.ts1<\/jb:expression>'VESMapper'<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATION<\/jb:expression>commonEventHeader.domain<\/jb:expression>commonEventHeader.ts1<\/jb:expression>commonEventHeader.ts1<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMAL<\/jb:expression><\/jb:bean>org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0<\/jb:expression><\/jb:bean><\/smooks-resource-list>" + "defaultMappingFile-rcc-notification":"org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1'pnfRegistration'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1'registration_'+commonEventHeader.ts1'VESMapper'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATIONcommonEventHeader.domaincommonEventHeader.ts1commonEventHeader.ts1org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMALpnfRegistrationFields.vendorName+'-'+pnfRegistrationFields.serialNumberorg.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0" } - ] + ], + "identifier":"notification-id", + "stream_publisher":"ves-pnfRegistration" }, - { + { "identifier":"notify OID", - "subscriberTopic":"ONAP-COLLECTOR-SNMPTRAP", + "stream_subscriber":"snmp-notification", "mapping-files":[ { - "defaultMappingFile-ONAP-COLLECTOR-SNMPTRAP":"'3.0''FaultField''XXXX''VESMapper'org.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Domain.FAULTcommonEventHeader.domainorg.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Priority.NORMAL'VesAdapter''SNMP Fault'org.onap.dcaegen2.ves.domain.ves54.FaultFields.VfStatus.ACTIVEorg.onap.dcaegen2.ves.domain.ves54.FaultFields.EventSeverity.MINOR" + "defaultMappingFile-snmp-notification":"'3.0''FaultField''XXXX''VESMapper'org.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Domain.FAULTcommonEventHeader.domainorg.onap.dcaegen2.ves.domain.ves54.CommonEventHeader.Priority.NORMAL'VesAdapter''SNMP Fault'org.onap.dcaegen2.ves.domain.ves54.FaultFields.VfStatus.ACTIVEorg.onap.dcaegen2.ves.domain.ves54.FaultFields.EventSeverity.MINOR" } - ] + ], + "stream_publisher":"ves-fault" } ] + }, + "streams_publishes":{ + "ves-fault":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://10.53.172.156:3904/events/unauthenticated.SEC_FAULT_OUTPUT" + } + }, + "ves-pnfRegistration":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://10.53.172.156:3904/events/unauthenticated.VES_PNFREG_OUTPUT" + } + } + }, + "streams_subscribes":{ + "snmp-notification":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://10.53.172.156:3904/events/ONAP-COLLECTOR-SNMPTRAP" + } + }, + "rcc-notification":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://10.53.172.156:3904/events/ONAP-COLLECTOR-RESTCONFTRAP" + } + } } } \ No newline at end of file diff --git a/UniversalVesAdapter/src/main/resources/kvTest.json b/UniversalVesAdapter/src/main/resources/kvTest.json deleted file mode 100644 index c4231cb..0000000 --- a/UniversalVesAdapter/src/main/resources/kvTest.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "app_preferences":{ - "collectors":[ - { - "identifier":"notification-id", - "subscriberTopic":"ONAP-COLLECTOR-RESTCONFTRAP", - "mapping-files":[ - { - "defaultMappingFile-ONAP-COLLECTOR-RESTCONFTRAP":"<\/json:keyMap><\/json:reader><\/jb:bean><\/jb:bean>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1<\/jb:expression>'pnfRegistration'<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1<\/jb:expression>'registration_'+commonEventHeader.ts1<\/jb:expression>'VESMapper'<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATION<\/jb:expression>commonEventHeader.domain<\/jb:expression>commonEventHeader.ts1<\/jb:expression>commonEventHeader.ts1<\/jb:expression>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMAL<\/jb:expression><\/jb:bean>org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0<\/jb:expression><\/jb:bean><\/smooks-resource-list>" - } - ] - } - ] - } -} \ No newline at end of file diff --git a/UniversalVesAdapter/src/main/resources/mapper.properties b/UniversalVesAdapter/src/main/resources/mapper.properties index d85010f..76751a9 100644 --- a/UniversalVesAdapter/src/main/resources/mapper.properties +++ b/UniversalVesAdapter/src/main/resources/mapper.properties @@ -1,41 +1,41 @@ - # ///////////////// DMaaP Config Constants - - # defaults to no username -mr.DEFAULT_USER_NAME=null - # defaults to no userPassword -mr.DEFAULT_USER_PASSWORD=null - #d efaults to using https protocol -mr.DEFAULT_PROTOCOL=http - # defaults to json content type -mr.DEFAULT_CONTENT_TYPE=application/json - -mr.DMAAP_URI_PATH_PREFIX=/events/ -mr.DMAAP_DEFAULT_CONSUMER_ID=con2 -mr.DMAAP_GROUP_PREFIX=grp2 - -#DMaaP MR subscriber thread's polling interval in milli second -mr.POLLING_INTERVAL=10000 - -# ///////////////// Publisher Constants - -#Dmaap Publisher Topic -mr.publisher.topic=unauthenticated.SEC_FAULT_OUTPUT -#disable batching by default -mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE=1 -# default recovery messages size -mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE=100000 -#number of retries when flushing messages -mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE=5 -#delay in retrying for flushing messages -mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE=5000 - - -#////////////////// Subscriber Constants -mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS=-1 -mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT=-1 -mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX=grp2 -mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME=timeout -mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME=limit -mr.mr_URI_PATH_PREFIX = /events/ -mr.mr_GROUP_PREFIX = grp2 -mr.mr_DEFAULT_CONSUMER_ID = con2 + # ///////////////// DMaaP Config Constants + + # defaults to no username +mr.DEFAULT_USER_NAME=null + # defaults to no userPassword +mr.DEFAULT_USER_PASSWORD=null + #d efaults to using https protocol +mr.DEFAULT_PROTOCOL=http + # defaults to json content type +mr.DEFAULT_CONTENT_TYPE=application/json + +mr.DMAAP_URI_PATH_PREFIX=/events/ +mr.DMAAP_DEFAULT_CONSUMER_ID=con2 +mr.DMAAP_GROUP_PREFIX=grp2 + +#DMaaP MR subscriber thread's polling interval in milli second +mr.POLLING_INTERVAL=10000 + +# ///////////////// Publisher Constants + +#Dmaap Publisher Topic +mr.publisher.topic=unauthenticated.VES_PNFREG_OUTPUT +#disable batching by default +mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE=1 +# default recovery messages size +mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE=100000 +#number of retries when flushing messages +mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE=5 +#delay in retrying for flushing messages +mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE=5000 + + +#////////////////// Subscriber Constants +mr.subscriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS=-1 +mr.subscriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT=-1 +mr.subscriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX=grp2 +mr.subscriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME=timeout +mr.subscriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME=limit +mr.mr_URI_PATH_PREFIX = /events/ +mr.mr_GROUP_PREFIX = grp2 +mr.mr_DEFAULT_CONSUMER_ID = con2 -- cgit 1.2.3-korg