diff options
author | Pooja03 <PM00501616@techmahindra.com> | 2018-08-09 17:49:09 +0530 |
---|---|---|
committer | Pooja03 <PM00501616@techmahindra.com> | 2018-08-09 17:49:09 +0530 |
commit | c17ce648ecc3453df8754b936f2b344f13f6dc65 (patch) | |
tree | dc57f5dc5bc9a6d02b415b11ceed2ae31e573af3 /UniversalVesAdapter/src/main/java | |
parent | 1463aaab6db65130de04d84a68fd9331a1c0caa9 (diff) |
Integratation of DMaaP, Mapping File
DMaaP integratation, Mapping File, Initialization of Adapter
Change-Id: I826aa2e64fa7c155f088a7519c24887ce88e2ec4
Issue-ID: DCAEGEN2-335
Signed-off-by: Pooja03 <PM00501616@techmahindra.com>
Diffstat (limited to 'UniversalVesAdapter/src/main/java')
47 files changed, 3296 insertions, 1001 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java index fa3b89c..95b31fb 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/GenericAdapter.java @@ -19,7 +19,7 @@ */ package org.onap.universalvesadapter.adapter; -import org.onap.universalvesadapter.exception.ConfigFileReadException; + import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; import org.onap.universalvesadapter.exception.VesException; @@ -32,7 +32,6 @@ import org.onap.universalvesadapter.exception.VesException; */ public interface GenericAdapter { -// String transform(String incomingJsonString) throws ConfigFileReadException; /** * It will take in an incoming json and identify the json type for different @@ -42,10 +41,9 @@ public interface GenericAdapter { * @param incomingJsonString json that is received on DMaap topic * @param eventType type identified from incoming json * @return VES format json - * @throws ConfigFileReadException if unable to read the configuration file * @throws ConfigFileSmooksConversionException if unable to convert config file data to smooks object - * @throws VesException if unable to convert into ves json + * */ - String transform(String incomingJsonString, String eventType) throws ConfigFileReadException, ConfigFileSmooksConversionException, VesException; + String transform(String incomingJsonString, String eventType) throws ConfigFileSmooksConversionException, VesException; } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java index 318c8cd..65c7b9c 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/adapter/UniversalEventAdapter.java @@ -24,26 +24,25 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import javax.annotation.PreDestroy; -import javax.annotation.Resource; - import org.milyn.Smooks; import org.onap.dcaegen2.ves.domain.VesEvent; -import org.onap.universalvesadapter.configs.UniversalEventConfiguration; -import org.onap.universalvesadapter.exception.ConfigFileReadException; import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; import org.onap.universalvesadapter.exception.VesException; -import org.onap.universalvesadapter.service.ConfigFileService; +import org.onap.universalvesadapter.service.VESAdapterInitializer; import org.onap.universalvesadapter.utils.SmooksUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xml.sax.SAXException; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSyntaxException; /** * Default implementation of the Generic Adapter @@ -51,86 +50,88 @@ import com.fasterxml.jackson.databind.ObjectMapper; * @author kmalbari * */ + @Component -public class UniversalEventAdapter implements GenericAdapter{ - +public class UniversalEventAdapter implements GenericAdapter { + private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private UniversalEventConfiguration configuration; - - @Resource(name="diskRepoConfigFileService") - private ConfigFileService configFileService; - -// private Smooks smooks; - + + private String enterpriseId; + @Value("${defaultMappingFileName}") + private String defaulMappingFileName; private Map<String, Smooks> eventToSmooksMapping = new ConcurrentHashMap<>(); - /*public String transform(String incomingJsonString) throws ConfigFileReadException { - String result = ""; - try { - //reading config file.. for now, looking at it as just one time operation - if(null == smooks){ - String configFileData = configFileService.readConfigFile(configuration.getConfigFile()); - smooks = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8))); - } - - VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooks, incomingJsonString); - ObjectMapper objectMapper = new ObjectMapper(); - result = objectMapper.writeValueAsString(vesEvent); - } catch (IOException | SAXException e) { - e.printStackTrace(); - } - - return result; - }*/ - + public UniversalEventAdapter() { + } + /** + * transforms JSON to VES format and and returns the ves Event + * + * @param IncomingJason,eventType + * @return ves Event + */ @Override - public String transform(String incomingJsonString, String eventType) throws ConfigFileReadException, - ConfigFileSmooksConversionException, VesException { + public String transform(String incomingJsonString, String eventType) + throws ConfigFileSmooksConversionException, VesException { String result = ""; + String configFileData; try { - if(null == eventToSmooksMapping.get(eventType)){ - LOGGER.debug("No smooks mapping for this event type " + eventType + ".. reading config file"); - String configFileData = configFileService.readConfigFile(configuration.getConfigForEvent(eventType)); - LOGGER.debug("Read config file " + configFileData); + + Gson gson = new Gson(); + JsonObject body = gson.fromJson(incomingJsonString, JsonObject.class); + JsonElement results = body.get("notify OID"); + String notifyOid = results.getAsString(); + + // extracting enterprise id from notify OID of SNMP trap. + enterpriseId = notifyOid.substring(0, notifyOid.length() - 4); + + if (VESAdapterInitializer.getMappingFiles().containsKey(enterpriseId)) { + configFileData = VESAdapterInitializer.getMappingFiles().get(enterpriseId); + LOGGER.debug("Using Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId); + } else { + + configFileData = VESAdapterInitializer.getMappingFiles().get(defaulMappingFileName); + LOGGER.debug("Using Default Mapping file as Mapping file is not available for Enterprise Id:{}",enterpriseId); + } + Smooks smooksTemp = new Smooks(new ByteArrayInputStream(configFileData.getBytes(StandardCharsets.UTF_8))); eventToSmooksMapping.put(eventType, smooksTemp); - LOGGER.debug("Added smooks mapping for event type" + eventType); - } - - - LOGGER.debug("Read smooks mapping for event type" + eventType); - LOGGER.debug("Transforming incoming json now"); - VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(eventToSmooksMapping.get(eventType), incomingJsonString); + + VesEvent vesEvent = SmooksUtils.getTransformedObjectForInput(smooksTemp,incomingJsonString); LOGGER.debug("Incoming json transformed to VES format successfully"); ObjectMapper objectMapper = new ObjectMapper(); - result = objectMapper.writeValueAsString(vesEvent); + result = objectMapper.writeValueAsString(vesEvent); LOGGER.debug("Serialized VES json"); } catch (JsonProcessingException exception) { - throw new VesException("Unable to convert pojo to VES format" + "\n Reason :" + exception.getMessage()); + throw new VesException("Unable to convert pojo to VES format, Reason :{}", exception); } catch (SAXException | IOException exception) { - throw new ConfigFileSmooksConversionException("Unable to convert config file into smooks for event type " + eventType - + "\n Reason :" + exception.getMessage()); + //Invalid Mapping file + LOGGER.error("Dropping this Trap :{},due to error Occured :Reason:", incomingJsonString, exception); + + } catch (JsonSyntaxException exception) { + // Invalid Trap + LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception); + }catch (JsonParseException exception) { + // Invalid Trap + LOGGER.error("Dropping this Invalid json Trap :{}, Reason:", incomingJsonString, exception); + } + catch (RuntimeException exception) { + + LOGGER.error("Dropping this Trap :{},Reason:", incomingJsonString, exception); + } return result; } - - + /** * Closes all open smooks' instances before bean is destroyed */ @PreDestroy - public void destroy(){ -// if(null != smooks) -// smooks.close(); - - for(Smooks smooks : eventToSmooksMapping.values()) + public void destroy() { + for (Smooks smooks : eventToSmooksMapping.values()) smooks.close(); - LOGGER.debug("All Smooks objects closed"); - } + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java deleted file mode 100644 index e47af70..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/Configuration.java +++ /dev/null @@ -1,24 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.configs; - -public abstract class Configuration { - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java new file mode 100644 index 0000000..fd4185b --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRBaseConfig.java @@ -0,0 +1,187 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.configs; + + + +import java.util.Locale; + +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.utils.HTTPUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.google.common.base.Objects; + + +/** + * <p> + * Contains common parameters for both DMaaP Message Router Publisher and Subscriber Configs + * <p> + * @author Rajiv Singla . Creation Date: 10/12/2016. + */ +@Component +public abstract class DMaaPMRBaseConfig { + + protected static final Logger LOG = LoggerFactory.getLogger(DMaaPMRBaseConfig.class); + + protected String hostName; + protected Integer portNumber; + protected String topicName; + protected String protocol; + protected String userName; + protected String userPassword; + protected String contentType; + + /** + * Provides host name e.g. mrlocal-mtnjftle01.homer.com + * + * @return host name + */ + public String getHostName() { + return hostName; + } + + + /** + * Provides Port Number of DMaaP MR Topic Host. Defaults to 80 + * + * @return host port number + */ + public Integer getPortNumber() { + return portNumber; + } + + /** + * Provides topic name e.g. com.dcae.dmaap.mtnje2.DcaeTestVES + * + * @return topic name + */ + public String getTopicName() { + return topicName; + } + + /** + * Provides protocol type e.g. http or https + * + * @return protocol type + */ + public String getProtocol() { + return protocol; + } + + /** + * Provides content type e.g. application/json + * + * @return content type + */ + public String getContentType() { + return contentType; + } + + + /** + * Provides User name for the DMaaP MR Topic authentication + * + * @return user name + */ + public String getUserName() { + return userName; + } + + /** + * Provides User password for the DMaaP MR Topic authentication + * + * @return user Password + */ + public String getUserPassword() { + return userPassword; + } + + + /** + * Trims, adjusts casing and validates user input String for protocol selection + * + * @param protocol - User input for protocol String + * @return - network protocol e.g http or https + */ + protected static String normalizeValidateProtocol(final String protocol) { + // validate that only http and https are supported protocols are Supported for DMaaP MR + String normalizedProtocolString = protocol.trim().toLowerCase(Locale.ENGLISH); + if (normalizedProtocolString.isEmpty() || + !("http".equals(normalizedProtocolString) || "https".equals(normalizedProtocolString))) { + + final String errorMessage = + "Unsupported protocol selection. Only HTTPS and HTTPS are currently supported for DMaaP MR"; + + throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return normalizedProtocolString; + } + + + /** + * Trims, adjust casing and validates content type is supported by DMaaP. + * + * NOTE: DMaaP currently only support application/json content type + * + * @param contentType content type that needs to checked for DMaaP MR support + * @return true if content type is supported by DMaaP MR + */ + protected static String normalizeValidateContentType(final String contentType) { + // Current DMaaP MR is only supporting "application/json" content type + String normalizedContentType = contentType.trim().toLowerCase(Locale.ENGLISH); + final boolean isSupported = contentType.equals(HTTPUtils.JSON_APPLICATION_TYPE); + if (!isSupported) { + final String errorMessage = + "Unsupported content type selection. Only application/json is currently supported for DMaaP MR"; + + throw new DMaapException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return normalizedContentType; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DMaaPMRBaseConfig)) { + return false; + } + DMaaPMRBaseConfig that = (DMaaPMRBaseConfig) o; + return Objects.equal(hostName, that.hostName) && + Objects.equal(portNumber, that.portNumber) && + Objects.equal(topicName, that.topicName) && + Objects.equal(protocol, that.protocol) && + Objects.equal(userName, that.userName) && + Objects.equal(userPassword, that.userPassword) && + Objects.equal(contentType, that.contentType); + } + + @Override + public int hashCode() { + return Objects.hashCode(hostName, portNumber, topicName, protocol, userName, userPassword, contentType); + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java new file mode 100644 index 0000000..70fcf58 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRPublisherConfig.java @@ -0,0 +1,255 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + + +//TO-DO Lisence Issue ASK to Kedar??????????? +package org.onap.universalvesadapter.configs; + +import java.io.IOException; +import javax.annotation.Nonnull; +import org.onap.universalvesadapter.utils.DmaapConfig; +import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan; +import com.google.common.base.Objects; + +/** + * <p> + * Immutable DMaaP MR Configuration for DMaaP MR Publisher. + * <p> + * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber + * Configuration + * </p> + * <p> + * + * @author Rajiv Singla . Creation Date: 10/12/2016. + * + */ +@ComponentScan +public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig { + + + /** + * Publisher batching queue size + */ + private int maxBatchSize; + + /** + * Publisher Recovery Queue Size + */ + private int maxRecoveryQueueSize; + + /** + * Default uri path prefix + */ + private String dmaapUriPathPrefix ; + + private DMaaPMRPublisherConfig(@Nonnull String hostName, @Nonnull Integer portNumber, @Nonnull String topicName, + @Nonnull String protocol, String userName, String userPassword, @Nonnull String contentType, + int maxBatchSize, int maxRecoveryQueueSize,String dmaapUriPathPrefix) { + this.hostName = hostName; + this.portNumber = portNumber; + this.topicName = topicName; + this.protocol = protocol; + this.userName = userName; + this.userPassword = userPassword; + this.contentType = contentType; + this.maxBatchSize = maxBatchSize; + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + this.dmaapUriPathPrefix =dmaapUriPathPrefix; + } + + /** + * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private int maxBatchSize; + private int maxRecoveryQueueSize; + private String dmaapUriPathPrefix ; + + + + public Builder(@Nonnull String hostName, @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { + this.hostName = hostName; + this.topicName = topicName; + // Default values + this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); + this.userName = dmaapConfig.getDEFAULT_USER_NAME(); + this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); + this.protocol =dmaapConfig.getDEFAULT_PROTOCOL(); + this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); + + this.maxBatchSize =dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(); + this.maxRecoveryQueueSize = dmaapConfig.getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE(); + this.dmaapUriPathPrefix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); + } + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber + * custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + /** + * Setup user name for authentication. If no username is provided authentication + * will be disabled + * + * @param userName + * user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + /** + * Setup user password for authentication. If no password is provided + * authentication will be disabled + * + * @param userPassword + * user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + /** + * Setup custom Publisher protocol - Defaults to https. Note: Only http and + * https are currently supported. + * + * @param protocol + * protocol e.g. https + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + /** + * Setup custom Publisher content-type - Defaults to application/json + * + * @param contentType + * content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + /** + * Setup custom Publisher Max Batch Size - Defaults to 100 + * + * @param maxBatchSize + * max Batch Size + * @return Builder object itself for chaining + */ + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + /** + * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold + * messages temporarily in case DMaaP MR Publisher topic is not responding for + * any reason. Defaults to 100,000 + * + * @param maxRecoveryQueueSize + * max recovery queue size + * @return Builder object itself for chaining + */ + public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) { + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + return this; + } + + + /** + * Creates immutable instance of {@link DMaaPMRPublisherConfig} + * + * @return Builds and returns thread safe, immutable + * {@link DMaaPMRPublisherConfig} object + */ + public DMaaPMRPublisherConfig build() { + return new DMaaPMRPublisherConfig(hostName, portNumber, topicName, protocol, userName, userPassword, + contentType, maxBatchSize, maxRecoveryQueueSize,dmaapUriPathPrefix); + } + + } + public String getDmaapUriPathPrefix() { + return dmaapUriPathPrefix; + } + + /** + * Returns max Publisher Batch Queue Size + * + * @return max Publisher Batch Queue size + */ + public int getMaxBatchSize() { + return maxBatchSize; + } + + /** + * Returns max Publisher Recovery Queue Size + * + * @return max Recovery Queue size + */ + public int getMaxRecoveryQueueSize() { + return maxRecoveryQueueSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o; + return maxBatchSize == that.maxBatchSize && maxRecoveryQueueSize == that.maxRecoveryQueueSize; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize); + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java new file mode 100644 index 0000000..2b8158a --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaaPMRSubscriberConfig.java @@ -0,0 +1,337 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.configs; + +import java.io.IOException; + +import javax.annotation.Nonnull; + +import org.onap.universalvesadapter.utils.DmaapConfig; + +import com.google.common.base.Objects; + +/** + * <p> + * Immutable DMaaP MR Configuration for Subscriber. + * <p> + * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration + * <p> + * + * @author Rajiv Singla . Creation Date: 10/12/2016. + */ +public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig { + + private final String consumerId; + private final String consumerGroup; + private final int timeoutMS; + private final int messageLimit; + private String timeoutMSParam; + private String messageLimitParam; + private String uriPrefix; + + private DMaaPMRSubscriberConfig(@Nonnull String hostName, + @Nonnull Integer portNumber, + @Nonnull String topicName, + @Nonnull String protocol, + String userName, + String userPassword, + @Nonnull String contentType, + @Nonnull String consumerId, + @Nonnull String consumerGroup, + @Nonnull int timeoutMS, + @Nonnull int messageLimit, + String timeoutMSParam, + String messageLimitParam, + String uriPrefix) { + this.hostName = hostName; + this.portNumber = portNumber; + this.topicName = topicName; + this.protocol = protocol; + this.userName = userName; + this.userPassword = userPassword; + this.contentType = contentType; + this.consumerId = consumerId; + this.consumerGroup = consumerGroup; + this.timeoutMS = timeoutMS; + this.messageLimit = messageLimit; + this.timeoutMSParam=timeoutMSParam; + this.messageLimitParam=messageLimitParam; + this.uriPrefix=uriPrefix; + + + } + + /** + * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private String consumerId; + private String consumerGroup; + private int timeoutMS; + private int messageLimit; + private String timeoutMSParam; + private String messageLimitParam; + private String uriPreifix; + + + + public Builder(@Nonnull String hostName, + @Nonnull String topicName, DmaapConfig dmaapConfig) throws IOException { + + // Required Values + this.hostName = hostName; + this.topicName = topicName; + // Default values + this.portNumber = dmaapConfig.getDEFAULT_PORT_NUMBER(); + this.userName = dmaapConfig.getDEFAULT_USER_NAME(); + this.userPassword = dmaapConfig.getDEFAULT_USER_PASSWORD(); + this.protocol = dmaapConfig.getDEFAULT_PROTOCOL(); + this.contentType = dmaapConfig.getDEFAULT_CONTENT_TYPE(); + this.consumerId = dmaapConfig.getDMAAP_DEFAULT_CONSUMER_ID(); + this.consumerGroup = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(); + this.timeoutMS =dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(); + this.messageLimit = dmaapConfig.getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(); + this.timeoutMSParam=dmaapConfig.getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(); + this.messageLimitParam=dmaapConfig.getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(); + this.uriPreifix=dmaapConfig.getDMAAP_URI_PATH_PREFIX(); + + } + + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + + /** + * Setup user name for authentication. If no username is provided authentication will be disabled + * + * @param userName user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + + /** + * Setup user password for authentication. If no password is provided authentication will be disabled + * + * @param userPassword user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + + /** + * Setup custom Subscriber protocol - Defaults to https. + * Note: Only http and https are currently supported. + * + * @param protocol protocol e.g. https or http + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + /** + * Setup custom Subscriber content-type - Defaults to application/json + * + * @param contentType content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + + /** + * Setup custom Consumer Id - Defaults to random Id + * + * @param consumerId - custom consumer ID + * @return Builder object itself for chaining + */ + public Builder setConsumerId(@Nonnull String consumerId) { + this.consumerId = consumerId; + return this; + } + + /** + * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID + * + * @param consumerGroup - custom Consumer Group + * @return Builder object itself for chaining + */ + public Builder setConsumerGroup(@Nonnull String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + /** + * Setup Custom Subscriber timeout in ms - Default to no timeout limit + * + * @param timeoutMS timeout in milliseconds + * @return Builder object itself for chaining + */ + public Builder setTimeoutMS(@Nonnull int timeoutMS) { + this.timeoutMS = timeoutMS; + return this; + } + + /** + * Setup custom Subscriber Message Limit - Default to no limit + * + * @param messageLimit message Limit + * @return Builder object itself for chaining + */ + public Builder setMessageLimit(@Nonnull int messageLimit) { + this.messageLimit = messageLimit; + return this; + } + + /** + * Builds Immutable instance of {@link DMaaPMRSubscriberConfig} + * + * @return immutable DMaaP Subscriber Config Object + * @throws IOException + */ + public DMaaPMRSubscriberConfig build() throws IOException { + return new DMaaPMRSubscriberConfig(hostName, portNumber, topicName, protocol, userName, userPassword, + contentType, consumerId, consumerGroup, timeoutMS, messageLimit, timeoutMSParam, messageLimitParam, + uriPreifix); + } + + } + + + public String getTimeoutMSParam() { + return timeoutMSParam; + } + + public void setTimeoutMSParam(String timeoutMSParam) { + this.timeoutMSParam = timeoutMSParam; + } + + public String getMessageLimitParam() { + return messageLimitParam; + } + + public void setMessageLimitParam(String messageLimitParam) { + this.messageLimitParam = messageLimitParam; + } + + public String getUriPrefix() { + return uriPrefix; + } + + public void setUriPrefix(String uriPreifix) { + this.uriPrefix = uriPreifix; + } + + /** + * DMaaP MR Subscriber Consumer Id + * + * @return consumer Id + */ + public String getConsumerId() { + return consumerId; + } + + /** + * DMaaP MR Subscriber Consumer Group + * + * @return consumer group + */ + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * DMaaP MR Subscriber Timeout in ms + * + * @return subscriber timeout ms + */ + public int getTimeoutMS() { + return timeoutMS; + } + + /** + * DMaaP MR Subscriber message limit + * + * @return subscriber message limit + */ + public int getMessageLimit() { + return messageLimit; + } + + /** + * DMaaP property file + * + * @return Properties + */ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o; + return Objects.equal(consumerId, that.consumerId) && + Objects.equal(consumerGroup, that.consumerGroup) && + Objects.equal(timeoutMS, that.timeoutMS) && + Objects.equal(messageLimit, that.messageLimit); + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit); + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java deleted file mode 100644 index 2b9a820..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DMaapMrUrlConfiguration.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.configs; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -/** - * Configuration for Dmaap MR Service - * - * @author kmalbari - * - */ -@Component -public class DMaapMrUrlConfiguration extends Configuration { - - @Value("${dmaap.url}") - private String url; - - @Value("${dmaap.consumer_props}") - private String consumerProperties; - - @Value("${dmaap.publisher_props}") - private String publisherProperties; - - public String getPublisherProperties() { - return publisherProperties; - } - - public String getConsumerProperties() { - return consumerProperties; - } - - public String getUrl() { - return url; - } - - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java deleted file mode 100644 index b1daf0d..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/DiskRepoConfiguration.java +++ /dev/null @@ -1,42 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.configs; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -/** - * - * Configuration for disk repository service - * - * @author kmalbari - * - */ -@Component -public class DiskRepoConfiguration extends Configuration { - - @Value("${fileService.url}") - private String fileRepositoryUrl; - - public String getFileRepositoryUrl() { - return fileRepositoryUrl; - } - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java index 3edca56..6f85ef3 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/configs/UniversalEventConfiguration.java @@ -33,12 +33,9 @@ import org.springframework.stereotype.Component; * */ @Component -public class UniversalEventConfiguration extends Configuration { +public class UniversalEventConfiguration{ private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - @Value("${snmpTrap.configFile}") - private String configFile; @Value("${universal.configFiles}") private String configFiles; @@ -68,6 +65,4 @@ public class UniversalEventConfiguration extends Configuration { } - //think about adding mapping files on runtime as well - } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java deleted file mode 100644 index e844270..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/MockDmaapController.java +++ /dev/null @@ -1,37 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.controller; - -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - - -@RestController -public class MockDmaapController { - - @RequestMapping("/greeting") - public String greeting(@RequestParam(value="name", defaultValue="World") String name) { -// return new Greeting(counter.incrementAndGet(), -// String.format(template, name)); - return "{ \"protocol version\": \"v2c\", \"notify OID\": \".1.3.6.1.4.1.74.2.46.12.1.1\", \"cambria.partition\": \"dcae-snmp.client.research.att.com\" , \"trap category\": \"UCSNMP-HEARTBEAT\", \"epoch_serno\": 15161177410000, \"community\" : \"public\", \"time received\": 1516117741, \"agent name\": \"localhost\", \"agent address\": \"127.0.0.1\", \"community len\": 6, \"notify OID len\": 12, \"varbinds\": [ { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.1\", \"varbind_value\": \"ucsnmp heartbeat - ignore\" }, { \"varbind_type\": \"octet\", \"varbind_oid\": \".1.3.6.1.4.1.74.2.46.12.1.1.2\", \"varbind_value\": \"Tue Jan 16 10:49:01 EST 2018\" } ] }"; - } - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java index 588c912..dfddfde 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/controller/VesController.java @@ -19,6 +19,8 @@ */ package org.onap.universalvesadapter.controller; +import org.onap.universalvesadapter.exception.MapperConfigException; +import org.onap.universalvesadapter.service.VESAdapterInitializer; import org.onap.universalvesadapter.service.VesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController; * * @author kmalbari */ + @RestController public class VesController { @@ -40,17 +43,35 @@ public class VesController { @Autowired private VesService vesService; + @Autowired + private VESAdapterInitializer vESAdapterInitializer; + /** * @return message that application is started */ @RequestMapping("/start") public String start() { - LOGGER.debug("UniversalVesAdapter Application starting..."); - vesService.start(); + LOGGER.info("UniversalVesAdapter Application starting..."); + + try { + vesService.start(); + } catch (MapperConfigException e) { + + LOGGER.error("Config error:{}",e.getMessage(),e.getCause()); + } return "Application started"; } + @RequestMapping("/reload") + public void reloadMappingFileFromDB() { + LOGGER.debug("Reload of Mapping File is started"); + vESAdapterInitializer.fetchMappingFile(); + LOGGER.debug("Reload of Mapping File is completed"); + } + + + /** * @return message that application stop process is triggered */ diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java new file mode 100644 index 0000000..efd0b0d --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/AnalyticsDMaaPModule.java @@ -0,0 +1,68 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap; + +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherImpl; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueFactory; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueueImpl; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberImpl; +import org.springframework.stereotype.Component; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + + + + +/** + * Guice Module to wire concrete implementations with interfaces + * <p> + * @author Rajiv Singla . Creation Date: 10/20/2016. + */ +@Component +public class AnalyticsDMaaPModule extends AbstractModule { + + + @Override + protected void configure() { + + // Bind Http Client + bind(CloseableHttpClient.class).toInstance(HttpClients.createDefault()); + + // Bind Publishing queue + install(new FactoryModuleBuilder().implement(DMaaPMRPublisherQueue.class, DMaaPMRPublisherQueueImpl.class) + .build(DMaaPMRPublisherQueueFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRPublisher.class, DMaaPMRPublisherImpl.class) + .build(DMaaPMRPublisherFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRSubscriber.class, DMaaPMRSubscriberImpl.class) + .build(DMaaPMRSubscriberFactory.class)); + + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java new file mode 100644 index 0000000..0638574 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/BaseDMaaPMRComponent.java @@ -0,0 +1,383 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap; + + +import static java.lang.String.format; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.LinkedList; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.util.EntityUtils; +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherQueue; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponse; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherResponseImpl; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponse; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberResponseImpl; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.utils.HTTPUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.ComponentScan; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; + +/** + * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods + * + * @author Rajiv Singla . Creation Date: 11/1/2016. + */ +@ComponentScan +public abstract class BaseDMaaPMRComponent { + + private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); + public BaseDMaaPMRComponent() {} + + + /** + * Creates Base64 encoded Auth Header for given userName and Password + * If either user name of password are null return absent + * + * @param userName username + * @param userPassword user password + * @return base64 encoded auth header if username or password are both non null + */ + protected static Optional<String> getAuthHeader(@Nullable final String userName, + @Nullable final String userPassword) { + if (userName == null || userPassword == null) { + return Optional.absent(); + } else { + final String auth = userName + ":" + userPassword; + final Charset isoCharset = Charset.forName("ISO-8859-1"); + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset)); + return Optional.of("Basic " + new String(encodedAuth, isoCharset)); + } + } + + + /** + * Creates Publisher URI for given {@link DMaaPMRPublisherConfig} + * + * @param publisherConfig publisher settings + * + * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic + */ + protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) { + final String hostName = publisherConfig.getHostName(); + final Integer portNumber = publisherConfig.getPortNumber(); + final String getProtocol = publisherConfig.getProtocol(); + final String topicName = publisherConfig.getTopicName(); + final String dmaapUriPathPrefix =publisherConfig.getDmaapUriPathPrefix(); + URI publisherURI = null; + try { + publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) + .setPath(dmaapUriPathPrefix + topicName).build(); + } catch (URISyntaxException e) { + final String errorMessage = format("Error while creating publisher URI: %s", e); + throw new DMaapException(errorMessage, LOG, e); + } + LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI); + return publisherURI; + } + + + /** + * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig} + * + * @param subscriberConfig subscriber settings + * + * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic + */ + protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) { + final String hostName = subscriberConfig.getHostName(); + final Integer portNumber = subscriberConfig.getPortNumber(); + final String getProtocol = subscriberConfig.getProtocol(); + final String topicName = subscriberConfig.getTopicName(); + final String consumerId = subscriberConfig.getConsumerId(); + final String consumerGroup = subscriberConfig.getConsumerGroup(); + final Integer timeoutMS = subscriberConfig.getTimeoutMS(); + final Integer messageLimit = subscriberConfig.getMessageLimit(); + URI subscriberURI = null; + + try { + URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) + .setPath(subscriberConfig.getUriPrefix() + + topicName + "/" + + consumerGroup + "/" + + consumerId); + // add query params if present + if (timeoutMS > 0) { + uriBuilder.addParameter(subscriberConfig.getTimeoutMSParam(), timeoutMS.toString()); + } + if (messageLimit > 0) { + uriBuilder.addParameter(subscriberConfig.getMessageLimitParam(), + messageLimit.toString()); + } + subscriberURI = uriBuilder.build(); + + } catch (URISyntaxException e) { + final String errorMessage = format("Error while creating subscriber URI: %s", e); + throw new DMaapException(errorMessage, LOG, e); + } + + LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI); + return subscriberURI; + } + + + /** + * Creates 202 (Accepted) Response code message + * + * @param batchQueueSize batch Queue size + * + * @return response with 202 message code + */ + protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) { + return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE, + "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize); + } + + + /** + * Creates 204 (No Content) Response code message + * + * @return response with 204 message code + */ + protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() { + return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE, + "No Content - No Messages in batch queue for flushing to MR Topic", 0); + } + + + /** + * Creates Publisher Response for given response code, response Message and pending Message Count + * + * @param responseCode HTTP Status Code + * @param responseMessage response message + * @param pendingMessages pending messages in batch queue + * + * @return DMaaP MR Publisher Response + */ + protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String + responseMessage, int pendingMessages) { + return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages); + } + + + /** + * Returns weekly consistent pending messages in batch queue + * + * @param publisherQueue batch queue + * @param publisherConfig publisher settings + * + * @return pending messages to be published + */ + protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue, + @Nonnull final DMaaPMRPublisherConfig publisherConfig) { + return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize(); + } + + + /** + * Creates Subscriber Response for give response Code, response Message and fetch messages + * + * @param responseCode response Code + * @param responseMessage response Message + * @param fetchedMessages fetched messages + * + * @return DMaaP MR Subscriber Response + */ + protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String + responseMessage, List<String> fetchedMessages) { + if (fetchedMessages == null) { + return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage); + } else { + return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages); + } + } + + + /** + * Custom response handler which extract status code and response body + * + * @return Pair containing Response code and response body + */ + protected static ResponseHandler<Pair<Integer, String>> responseHandler() { + return new ResponseHandler<Pair<Integer, String>>() { + @Override + public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException { + // Get Response status code + final int status = response.getStatusLine().getStatusCode(); + final HttpEntity responseEntity = response.getEntity(); + // If response entity is not null - extract response body as string + String responseEntityString = ""; + if (responseEntity != null) { + responseEntityString = EntityUtils.toString(responseEntity); + } + return new ImmutablePair<>(status, responseEntityString); + } + }; + } + + + /** + * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will + * be lost + * + * @param publisherQueue publisher queue + * @param messages recoverable messages to be published to recovery queue + */ + protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue, + List<String> messages) { + try { + publisherQueue.addRecoverableMessages(messages); + + LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}", + messages.size(), publisherQueue.getBatchQueueRemainingSize()); + + } catch (IllegalStateException e) { + final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " + + "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d", + messages.size(), publisherQueue.getRecoveryQueueRemainingSize()); + throw new DMaapException(errorMessage, LOG, e); + } + } + + + + + + /** + * Converts List of messages to Json String Array which can be published to DMaaP MR topic. + * + * @param messages messages that need to parsed to Json Array representation + * @return json string representation of message + */ + protected static String convertToJsonString(@Nullable final List<String> messages) { + // If messages are null or empty just return empty array + if (messages == null || messages.isEmpty()) { + return "[]"; + } + + + List<JsonNode> jsonMessageObjectsList = new LinkedList<>(); + + try { + for (String message : messages) { + final JsonNode jsonNode = objectMapper.readTree(message); + jsonMessageObjectsList.add(jsonNode); + } + return objectMapper.writeValueAsString(jsonMessageObjectsList); + } catch (JsonProcessingException e) { + final String errorMessage = + format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s", + messages, e); + throw new DMaapException(errorMessage, LOG, e); + + } catch (IOException e) { + final String errorMessage = + format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s", + messages, e); + throw new DMaapException(errorMessage, LOG, e); + } + } + + + /** + * Converts subscriber messages json string to List of messages. If message Json String is empty + * or null + * + * @param messagesJsonString json messages String + * + * @return List containing DMaaP MR Messages + */ + protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) { + + final LinkedList<String> messages = new LinkedList<>(); + + // If message string is not null or not empty parse json message array to List of string messages + if (messagesJsonString != null && !messagesJsonString.trim().isEmpty() + && !("[]").equals(messagesJsonString.trim())) { + + try { + // get root node + final JsonNode rootNode = objectMapper.readTree(messagesJsonString); + // iterate over root node and parse arrays messages + for (JsonNode jsonNode : rootNode) { + // if array parse it is array of messages + final String incomingMessageString = jsonNode.toString(); + if (jsonNode.isArray()) { + final List messageList = objectMapper.readValue(incomingMessageString, List.class); + for (Object message : messageList) { + final String jsonMessageString = objectMapper.writeValueAsString(message); + addUnescapedJsonToMessage(messages, jsonMessageString); + } + } else { + // parse it as object + addUnescapedJsonToMessage(messages, incomingMessageString); + } + } + + } catch (IOException e) { + final String errorMessage = + format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," + + " Json Error: %s", messagesJsonString, e); + throw new DMaapException(errorMessage, LOG, e); + } + + } + return messages; + } + + /** + * Adds unescaped Json messages to given messages list + * + * @param messages message list in which unescaped messages will be added + * @param incomingMessageString incoming message string that may need to be escaped + */ + private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) { + if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) { + messages.add(StringEscapeUtils.unescapeJava( + incomingMessageString.substring(1, incomingMessageString.length() - 1))); + } else { + messages.add(StringEscapeUtils.unescapeJava(incomingMessageString)); + } + } + + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java new file mode 100644 index 0000000..cd76f79 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/Creator.java @@ -0,0 +1,93 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.universalvesadapter.dmaap; + +import java.io.IOException; + +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.utils.DmaapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class Creator { + + private final Logger LOGGER = LoggerFactory.getLogger(Creator.class); + private DMaaPMRFactory dMaaPMRFactoryInstance; + private String hostname; + private String publisherTopic; + private String subcriberTopic; + private DmaapConfig dmaapConfig; + + @Autowired + public void setDmaapConfig(DmaapConfig dmaapConfig) { + this.dmaapConfig = dmaapConfig; + } + + public Creator() { + + } + + // prop initializer + + public void propertyFileInitializer() { + + this.hostname = dmaapConfig.getHostname(); + this.publisherTopic = dmaapConfig.getPublisherTopic(); + this.subcriberTopic = dmaapConfig.getSubscriberTopic(); + this.dMaaPMRFactoryInstance = DMaaPMRFactory.create(); + LOGGER.info("The Hostname of DMaap is :" + hostname); + + + } + + // publisher + public DMaaPMRPublisher getDMaaPMRPublisher(){ + propertyFileInitializer(); + DMaaPMRPublisherConfig dMaaPMRPublisherConfig = null; + try { + dMaaPMRPublisherConfig = new DMaaPMRPublisherConfig.Builder(hostname, publisherTopic,dmaapConfig).build(); + } catch (IOException e) { + LOGGER.error("failed or interrupted I/O operations while creating publisher config:{}",e.getCause()); + } + return dMaaPMRFactoryInstance.createPublisher(dMaaPMRPublisherConfig); + } + + // subscriber + public DMaaPMRSubscriber getDMaaPMRSubscriber(){ + propertyFileInitializer(); + DMaaPMRSubscriberConfig dMaaPMRSubscriberConfig = null; + try { + dMaaPMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(hostname, subcriberTopic, dmaapConfig).build(); + } catch (IOException e) { + + LOGGER.error("failed or interrupted I/O operations while creating subcriber config:{}",e.getCause()); + } + + return dMaaPMRFactoryInstance.createSubscriber(dMaaPMRSubscriberConfig); + + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java new file mode 100644 index 0000000..50f214c --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/DMaaPMRFactory.java @@ -0,0 +1,116 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap; + +import javax.annotation.Nonnull; + +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisherFactory; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriberFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; + + +/** + * Creates pre injected implementations for {@link DMaaPMRPublisher} and {@link DMaaPMRSubscriber} + * <p> + * Usage: + * <p>Create an instance of DMaaP MR Factory</p> + * <pre> + * DMaaPFactory dmaapFactory = DMaaPFactory.initalize() + * </pre> + * <p>Create a new DMaaP MR Publisher</p> + * <pre> + * DMaaPMRPublisher publisher = dmaapFactory.createPublisher(publisherConfig) + * </pre> + * <p>Create new DMaaP MR Subscriber</p> + * <pre> + * DMaaPMRSubscriber subscriber = dmaapFactory.createSubscriber(subscriberConfig) + * </pre> + * <p> + * <strong>All Clients must use this Factory to initalize DMaaP Message Router Publishers and Subscribers</strong> + * </p> + * <p> + * @author Rajiv Singla . Creation Date: 10/20/2016. + */ + +public class DMaaPMRFactory { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRFactory.class); + + private final Injector injector; + + public DMaaPMRFactory(AbstractModule guiceModule) { + injector = Guice.createInjector(guiceModule); + } + + /** + * Returns configured instance of {@link DMaaPMRPublisher} + * + * @param publisherConfig Publisher Config + * @return configured instance of DMaaP MR Publisher + */ + public DMaaPMRPublisher createPublisher(@Nonnull DMaaPMRPublisherConfig publisherConfig) { + final DMaaPMRPublisherFactory publisherFactory = injector.getInstance(Key.get(DMaaPMRPublisherFactory.class)); + LOG.debug("Creating new DMaaP MR Publisher Instance with configuration: {}", publisherConfig); + final DMaaPMRPublisher dMaaPMRPublisher = publisherFactory.create(publisherConfig); + + LOG.info("Created new DMaaP MR Publisher Instance. Publisher creation time: {}", + dMaaPMRPublisher.getPublisherCreationTime()); + return dMaaPMRPublisher; + } + + /** + * Returns configured instance of {@link DMaaPMRSubscriber} + * + * @param subscriberConfig Subscriber Config + * @return configured instance of DMaaP MR Subscriber + */ + public DMaaPMRSubscriber createSubscriber(@Nonnull DMaaPMRSubscriberConfig subscriberConfig) { + final DMaaPMRSubscriberFactory subscriberFactory = injector.getInstance(DMaaPMRSubscriberFactory.class); + LOG.debug("Creating new DMaaP MR Subscriber Instance with configuration: {}", subscriberConfig); + final DMaaPMRSubscriber dMaaPMRSubscriber = subscriberFactory.create(subscriberConfig); + LOG.info("Created new DMaaP MR Subscriber Instance. Subscriber creation time: {}", + dMaaPMRSubscriber.getSubscriberCreationTime()); + return dMaaPMRSubscriber; + } + + /** + * Creates an instance of {@link DMaaPMRFactory} + * + * @return {@link DMaaPMRFactory} factory instance + */ + public static DMaaPMRFactory create() { + final DMaaPMRFactory dMaaPMRFactory = new DMaaPMRFactory(new AnalyticsDMaaPModule()); + LOG.info("Created new instance of DMaaP MR Factory"); + return dMaaPMRFactory; + } + + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java new file mode 100644 index 0000000..6d867b9 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisher.java @@ -0,0 +1,91 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; +import java.util.Date; +import java.util.List; + + +/** + * <p> + * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics. + * <p> + * + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public interface DMaaPMRPublisher extends AutoCloseable { + + + /** + * <p> + * Adds collection of messages to DMaaP MR Topic Publishing Queue. + * <p> + * Note: Invoking this method may or may not cause publishing immediately + * as publishing in done is batch mode by default. Parameter maxBatchSize + * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size. + * If the maxBatchSize is reached all message will be published automatically + * during subsequent call. + * </p> + * + * @param messages messages to publish to DMaaP MR Publisher + * @return response which may contain Http Response code 202 (Accepted) as publishing + * will proceed when max batch size is reached. Throws {@link DMaapException} + * if publishing fails + */ + DMaaPMRPublisherResponse publish(List<String> messages); + + + /** + * <p> + * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse} + * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic. + * </p> + * + * @param messages messages to publish to DMaaP MR Publisher + * @return DMaaP Message Router Publisher Response. Throws {@link DMaapException} + * if force publishing fails + * + */ + DMaaPMRPublisherResponse forcePublish(List<String> messages); + + + /** + * <p> + * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns + * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to + * be flushed response code 304 (Not Modified) will be returned + * </p> + * + * @return DMaaP Message Router Publisher Response + */ + DMaaPMRPublisherResponse flush(); + + + /** + * <p> + * Returns the creation time when Publisher instance was created. + * <p> + * + * @return creation time of Subscriber instance + */ + Date getPublisherCreationTime(); + + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java new file mode 100644 index 0000000..b60c74c --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherFactory.java @@ -0,0 +1,48 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; + +/** + * <p> + * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes. + * <p> + * <strong> + * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they + * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize + * guice injected Publisher instances + * </strong> + * <p> + * @author Rajiv Singla . Creation Date: 10/20/2016. + */ +public interface DMaaPMRPublisherFactory { + + /** + * Guice Factory to create DMaaP MR Publisher + * + * @param publisherConfig publisher config + * + * @return DMaaP MR Publisher instance + */ + DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig); + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java new file mode 100644 index 0000000..5930f41 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherImpl.java @@ -0,0 +1,220 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import static java.lang.String.format; + + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpHeaders; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig; +import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.utils.HTTPUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; + +import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + + + +/** + * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient} + * + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +@ComponentScan +public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher { + + @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}") + private int publisherMaxFlushRetries; + + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class); + + public int a =2; + private final DMaaPMRPublisherConfig publisherConfig; + private final CloseableHttpClient closeableHttpClient; + private final DMaaPMRPublisherQueue publisherQueue; + private final Date publisherCreationTime; + private URI publisherUri; + + @Inject + public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig, + DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory, + CloseableHttpClient closeableHttpClient){ + + this.publisherConfig = publisherConfig; + final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1; + this.publisherQueue = dMaaPMRPublisherQueueFactory.create( + maxBatchSize, publisherConfig.getMaxRecoveryQueueSize()); + this.closeableHttpClient = closeableHttpClient; + this.publisherUri = createPublisherURI(publisherConfig); + this.publisherCreationTime = new Date(); + } + + + @Override + public DMaaPMRPublisherResponse publish(List<String> messages) { + + final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize(); + + // if messages size is less than batch queue size - just queue them for batch publishing + if (batchQueueRemainingSize > messages.size()) { + LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}", + messages.size(), batchQueueRemainingSize); + final int batchQueueSize = publisherQueue.addBatchMessages(messages); + return createPublisherAcceptedResponse(batchQueueSize); + + } else { + + // grab all already queued messages, append current messages and force publish them to DMaaP MR topic + final List<String> queueMessages = publisherQueue.getMessageForPublishing(); + LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " + + "Publisher Topic."); + return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages))); + } + + } + + @Override + public DMaaPMRPublisherResponse forcePublish(List<String> messages) { + + LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size()); + + final String contentType = publisherConfig.getContentType(); + final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName(); + final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword(); + final HttpPost postRequest = new HttpPost(publisherUri); + + // add Authorization Header if username and password are present + final Optional<String> authHeader = getAuthHeader(userName, userPassword); + if (authHeader.isPresent()) { + postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); + } else { + LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present."); + } + + // Create post string entity + final String messagesJson = convertToJsonString(messages); + final StringEntity requestEntity = + new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8")); + postRequest.setEntity(requestEntity); + + try { + final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler()); + final Integer responseCode = responsePair.getLeft(); + final String responseBody = responsePair.getRight(); + // if messages were published successfully, return successful response + if (HTTPUtils.isSuccessfulResponseCode(responseCode)) { + LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " + + "Body: {}, Number of Messages published: {}", + responseCode, responseBody, messages.size()); + + } else { + LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " + + "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody); + addMessagesToRecoveryQueue(publisherQueue, messages); + } + + return createPublisherResponse(responseCode, responseBody, + getPendingMessages(publisherQueue, publisherConfig)); + + } catch (IOException e) { + // If IO Error then we need to also put messages in recovery queue + addMessagesToRecoveryQueue(publisherQueue, messages); + final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " + + "Messages will be queued in recovery queue. Messages Size: %d", messages.size()); + + throw new DMaapException(errorMessage, LOG, e); + } + + } + + + @Override + public DMaaPMRPublisherResponse flush() { + final List<String> queueMessages = publisherQueue.getMessageForPublishing(); + // If there are no message return 204 (No Content) response code + if (queueMessages.isEmpty()) { + LOG.debug("No messages to publish to batch queue. Returning 204 status code"); + return createPublisherNoContentResponse(); + } else { + // force publish messages in queue + return forcePublish(queueMessages); + } + } + + @Override + public Date getPublisherCreationTime() { + return new Date(publisherCreationTime.getTime()); + } + + @Override + public void close() throws Exception { + + // flush current message in the queue + int retrialNumber = 0; + int flushResponseCode; + + // automatic retries if messages cannot be flushed + do { + retrialNumber++; + DMaaPMRPublisherResponse flushResponse = flush(); + flushResponseCode = flushResponse.getResponseCode(); + + if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) { + LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " + + "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber, + publisherMaxFlushRetries); + + Thread.sleep(publisherMaxFlushRetries); + } + } while (retrialNumber <= publisherMaxFlushRetries && + !HTTPUtils.isSuccessfulResponseCode(flushResponseCode)); + + if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) { + LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented"); + } else { + LOG.info("Successfully published all batched messages to publisher."); + } + + // close http client + closeableHttpClient.close(); + + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java new file mode 100644 index 0000000..18feec8 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueue.java @@ -0,0 +1,87 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import java.util.List; + +/** + * <p> + * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic + * is offline for some reason. It does so by having a recovery queue which keeps + * messages in order in case there is temporary interruption in DMaaP Publisher + * </p> + * + * @author Rajiv Singla . Creation Date: 11/1/2016. + */ +public interface DMaaPMRPublisherQueue { + + /** + * <p> + * Add batchMessages to Batch Queue + * </p> + * + * @param batchMessages messages that needs to be added to batch queue + * @return current size of batch queue. Throws {@link IllegalStateException} + * if batch queue does not have enough space + */ + int addBatchMessages(List<String> batchMessages); + + + /** + * <p> + * Add recoverable messages to Recoverable Queue + * </p> + * + * @param recoverableMessages messages that needs to be added to recoverable queue + * @return current size of the recoverable queue. Throws {@link IllegalStateException} + * if recoverable queue does not have enough space + */ + int addRecoverableMessages(List<String> recoverableMessages); + + /** + * <p> + * Get messages that need to be published to DMaaP topic. Messages in recoverable + * queue are appended if present. + * </p> + * + * @return List of messages from both batch and recovery queue + */ + List<String> getMessageForPublishing(); + + /** + * <p> + * Remaining capacity of Batch Queue + * </p> + * + * @return Remaining Batch Queue Size + */ + int getBatchQueueRemainingSize(); + + /** + * <p> + * Remaining capacity of Recovery Queue + * </p> + * + * @return Remaining Recovery Queue Size + */ + int getRecoveryQueueRemainingSize(); + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java new file mode 100644 index 0000000..f100210 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueFactory.java @@ -0,0 +1,45 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import com.google.inject.assistedinject.Assisted; + +/** + * <p> + * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes. + * <p> + * + * @author Rajiv Singla . Creation Date: 11/1/2016. + */ +public interface DMaaPMRPublisherQueueFactory { + + /** + * Guice Factory to create DMaaP MR Publisher Queue + * + * @param batchQueueSize batch queue size + * @param recoveryQueueSize recovery queue size + * + * @return instance of DMaaP MR Publisher Queue + */ + DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize, + @Assisted("recoveryQueueSize") int recoveryQueueSize); + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java new file mode 100644 index 0000000..fc240d6 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherQueueImpl.java @@ -0,0 +1,126 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; + +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Lists.newLinkedList; +import static java.util.Collections.unmodifiableList; + +/** + * <p> + * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque} + * for batch and recovery queues + * </p> + * + * + * @author Rajiv Singla . Creation Date: 11/1/2016. + */ +public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class); + + private final LinkedBlockingDeque<String> batchQueue; + private final LinkedBlockingDeque<String> recoveryQueue; + + @Inject + public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize, + @Assisted("recoveryQueueSize") int recoveryQueueSize) { + batchQueue = new LinkedBlockingDeque<>(batchQueueSize); + recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize); + LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}", + batchQueueSize, recoveryQueueSize); + } + + @Override + public synchronized int addBatchMessages(List<String> batchMessages) { + + // checks if batchMessages size does not exceed batch queue capacity + if (batchMessages.size() > batchQueue.remainingCapacity()) { + throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue"); + } + + // Add batchMessages to batch queue + for (String message : batchMessages) { + batchQueue.add(message); + } + + // returns current elements size in batch queue + return batchQueue.size(); + } + + @Override + public synchronized int addRecoverableMessages(List<String> recoverableMessages) { + + // checks if messages size does not exceed recovery queue size + if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) { + throw new IllegalStateException("Not enough capacity to add messages in recovery queue"); + } + + // add messages to recovery queue + for (String recoverableMessage : recoverableMessages) { + recoveryQueue.add(recoverableMessage); + } + + // returns current size of recovery queue + return recoveryQueue.size(); + } + + @Override + public synchronized List<String> getMessageForPublishing() { + + final List<String> recoveryMessageList = new LinkedList<>(); + final List<String> batchMessagesList = new LinkedList<>(); + + // get messages from recovery queue if present + if (!recoveryQueue.isEmpty()) { + final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList); + LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize); + } + + // get messages from batch queue if present + if (!batchQueue.isEmpty()) { + final int batchQueueSize = batchQueue.drainTo(batchMessagesList); + LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize); + } + + // concat recovery and batch queue elements + return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList))); + } + + @Override + public synchronized int getBatchQueueRemainingSize() { + return batchQueue.remainingCapacity(); + } + + @Override + public synchronized int getRecoveryQueueRemainingSize() { + return recoveryQueue.remainingCapacity(); + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java new file mode 100644 index 0000000..ee12cc8 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponse.java @@ -0,0 +1,50 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +/** + * <p> + * Contract for all DMaaPMR Publisher Response + * <p> + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public interface DMaaPMRPublisherResponse { + + /** + * Gets HTTP Response Code + * + * @return HTTP Response code as String + */ + Integer getResponseCode(); + + /** + * Gets Response Message + * + * @return Response Message + */ + String getResponseMessage(); + /** + * Gets number of pending messages + * + * @return pending messages in the batch queue + */ + int getPendingMessagesCount(); +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java new file mode 100644 index 0000000..4570f78 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRPublisher/DMaaPMRPublisherResponseImpl.java @@ -0,0 +1,60 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRPublisher; + +import javax.annotation.Nonnull; + +/** + * <p> + * An simple implementation of {@link DMaaPMRPublisherResponse} + * <p> + * + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public class DMaaPMRPublisherResponseImpl implements DMaaPMRPublisherResponse { + + private final Integer responseCode; + private final String responseMessage; + private final int pendingMessagesCount; + + public DMaaPMRPublisherResponseImpl(@Nonnull Integer responseCode, @Nonnull String responseMessage, + int pendingMessagesCount) { + this.responseCode = responseCode; + this.responseMessage = responseMessage; + this.pendingMessagesCount = pendingMessagesCount; + } + + @Override + public Integer getResponseCode() { + return responseCode; + } + + @Override + public String getResponseMessage() { + return responseMessage; + } + + @Override + public int getPendingMessagesCount() { + return pendingMessagesCount; + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java new file mode 100644 index 0000000..336e3dd --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriber.java @@ -0,0 +1,56 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRSubcriber; + + + +import java.util.Date; + +/** + * <p> + * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics. + * <p> + * + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public interface DMaaPMRSubscriber extends AutoCloseable { + + /** + * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters + * for messageLimit and message timeout are used + * + * @return DMaaP Message Router Subscriber Response + */ + DMaaPMRSubscriberResponse fetchMessages(); + + + /** + * Returns the Subscriber instance creation time + * <p> + * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which + * are published to the topic after the creation of the Subscriber. + * + * @return creation time of Subscriber instance + */ + Date getSubscriberCreationTime(); + + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java new file mode 100644 index 0000000..dcb6f4c --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberFactory.java @@ -0,0 +1,46 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRSubcriber; + +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; + +/** + * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes. + * <p> + * <strong> + * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they + * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize + * guice injected Subscriber instances + * </strong> + * <p> + * @author Rajiv Singla . Creation Date: 10/20/2016. + */ +public interface DMaaPMRSubscriberFactory { + + /** + * Guice Factory to create DMaaP MR Subscriber Instance + * + * @param subscriberConfig subscriber config + * + * @return DMaaP MR Subscriber instance + */ + DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig); +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java new file mode 100644 index 0000000..2e7aac9 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberImpl.java @@ -0,0 +1,128 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRSubcriber; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpHeaders; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.onap.universalvesadapter.configs.DMaaPMRSubscriberConfig; +import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.utils.HTTPUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import static java.lang.String.format; + +/** + * Concrete Implementation of {@link DMaaPMRSubscriber} which uses + * {@link HttpClient} + * + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class); + + private final DMaaPMRSubscriberConfig subscriberConfig; + private final CloseableHttpClient closeableHttpClient; + private final URI subscriberUri; + private final Date subscriberCreationTime; + + @Inject + public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig, + CloseableHttpClient closeableHttpClient) { + this.subscriberConfig = subscriberConfig; + this.closeableHttpClient = closeableHttpClient; + this.subscriberUri = createSubscriberURI(subscriberConfig); + this.subscriberCreationTime = new Date(); + } + + @Override + public DMaaPMRSubscriberResponse fetchMessages() { + + final String userName = (subscriberConfig.getUserName().equals("null")) ? null : subscriberConfig.getUserName(); + final String userPassword = (subscriberConfig.getUserPassword().equals("null")) ? null + : subscriberConfig.getUserPassword(); + final HttpGet getRequest = new HttpGet(subscriberUri); + + // add Authorization Header if username and password are present + final Optional<String> authHeader = getAuthHeader(userName, userPassword); + if (authHeader.isPresent()) { + getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); + } else { + LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present."); + } + + try { + + final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler()); + final Integer responseCode = responsePair.getLeft(); + final String responseBody = responsePair.getRight(); + + List<String> fetchedMessages = new LinkedList<>(); + String responseMessage = responseBody; + + // if messages were published successfully, return successful response + if (HTTPUtils.isSuccessfulResponseCode(responseCode)) { + if (responseBody != null) { + fetchedMessages = convertJsonToStringMessages(responseBody); + responseMessage = "Messages Fetched Successfully"; + } else { + responseMessage = "DMaaP Response Body had no messages"; + } + } else { + LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " + + "DMaaP Response Body: {}", responseCode, responseBody); + } + + return createSubscriberResponse(responseCode, responseMessage, fetchedMessages); + + } catch (IOException e) { + + final String errorMessage = format("IO Exception while fetching messages from DMaaP Topic. Exception %s", + e); + throw new DMaapException(errorMessage, LOG, e); + } + + } + + @Override + public Date getSubscriberCreationTime() { + return new Date(subscriberCreationTime.getTime()); + } + + @Override + public void close() throws Exception { + closeableHttpClient.close(); + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java new file mode 100644 index 0000000..98b6b01 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponse.java @@ -0,0 +1,53 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRSubcriber; + +import java.util.List; + +/** + * <p> + * Contract for all DMaaP MR Subscriber Responses + * </p> + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public interface DMaaPMRSubscriberResponse { + + /** + * Gets HTTP Response Code + * + * @return HTTP Response code as String + */ + Integer getResponseCode(); + + /** + * Gets Response Message + * + * @return Response Message + */ + String getResponseMessage(); + /** + * Returns message fetched from DMaaP MR Topic + * + * @return collection of actual message retrieved from DMaaP MR Topic + */ + List<String> getFetchedMessages(); + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java new file mode 100644 index 0000000..e318359 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/dmaap/MRSubcriber/DMaaPMRSubscriberResponseImpl.java @@ -0,0 +1,79 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.dmaap.MRSubcriber; + +import static java.util.Collections.unmodifiableList; + +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableList; + +/** + * <p> + * A simple implementation for {@link DMaaPMRSubscriberResponse} + * <p> + * @author Rajiv Singla . Creation Date: 10/13/2016. + */ +public class DMaaPMRSubscriberResponseImpl implements DMaaPMRSubscriberResponse { + + private final Integer responseCode; + private final String responseMessage; + private final List<String> fetchedMessages; + + public DMaaPMRSubscriberResponseImpl(@Nonnull Integer responseCode, + @Nonnull String responseMessage, + @Nullable List<String> fetchedMessages) { + this.responseCode = responseCode; + this.responseMessage = responseMessage; + this.fetchedMessages = fetchedMessages != null ? fetchedMessages : ImmutableList.<String>of(); + } + + public DMaaPMRSubscriberResponseImpl(Integer responseCode, String responseMessage) { + this(responseCode, responseMessage, null); + } + + @Override + public Integer getResponseCode() { + return responseCode; + } + + @Override + public String getResponseMessage() { + return responseMessage; + } + + @Override + public List<String> getFetchedMessages() { + return unmodifiableList(fetchedMessages); + } + + @Override + public String toString() + { + + return responseMessage; + + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java deleted file mode 100644 index 796fe70..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/domain/ConfigFileData.java +++ /dev/null @@ -1,57 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.domain; - -import org.springframework.data.annotation.Id; - -/** - * A domain wrapper class for saving the config file in Mongo DB - * - * @author kmalbari - * - */ -public class ConfigFileData { - - - @Id private String id; - - private String xmlFileName; - - private String xmlContent; - - public String getXmlFileName() { - return xmlFileName; - } - - public void setXmlFileName(String xmlFileName) { - this.xmlFileName = xmlFileName; - } - - public String getXmlContent() { - return xmlContent; - } - - public void setXmlContent(String xmlContent) { - this.xmlContent = xmlContent; - } - - - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java deleted file mode 100644 index 625b021..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.exception; - -/** - * Exception when unable to connect to Config file Disk repository - * - * @author kmalbari - * - */ -public class ConfigFileReadException extends VesException { - - /** - * - */ - private static final long serialVersionUID = 414953072485703000L; - - public ConfigFileReadException(String exceptionMessage) { - super(exceptionMessage); - } - - public ConfigFileReadException(String exceptionMessage, Exception exception) { - super(exceptionMessage, exception); - } - - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java index 7055bc0..1f642b2 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java @@ -28,9 +28,6 @@ package org.onap.universalvesadapter.exception; */ public class ConfigFileSmooksConversionException extends VesException { - /** - * - */ private static final long serialVersionUID = 7128340575013771888L; public ConfigFileSmooksConversionException(String string) { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java index 7a35f83..3b30fdd 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java @@ -19,17 +19,18 @@ */ package org.onap.universalvesadapter.exception; +import org.slf4j.Logger; + + /** * Exception generated when dealing with communication to DMaap MR API * * @author kmalbari * */ -public class DMaapException extends VesException { +public class DMaapException extends RuntimeException { - /** - * - */ + private static final long serialVersionUID = 7045766597511192878L; public DMaapException(String string) { @@ -39,6 +40,26 @@ public class DMaapException extends VesException { public DMaapException(String string, Exception exception) { super(string, exception); } + + /** + * @param message - Error Message for Exception + * @param cause - Actual Exception which caused {@link DMaapException} + */ + public DMaapException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Creates and logs the DCAE Runtime Exception to given logger + * + * @param message - Error Message for Exception and logging + * @param logger - Logger used for logging exception + * @param cause - Actual exception which caused {@link DMaapException} + */ + public DMaapException(String message, Logger logger, Throwable cause) { + super(message, cause); + logger.error(message); + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java index 3dfa034..b4ebcc5 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java @@ -27,9 +27,6 @@ package org.onap.universalvesadapter.exception; */ public class MapperConfigException extends VesException { - /** - * - */ private static final long serialVersionUID = -7876042513908918292L; public MapperConfigException(String string) { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java index fd11b89..12c74cb 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java @@ -28,10 +28,6 @@ package org.onap.universalvesadapter.exception; */ public class VesException extends Exception { - - /** - * - */ private static final long serialVersionUID = -8549819066568432382L; public VesException(String string) { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java index e34b98a..06d8249 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/mappingconfig/Evaluation.java @@ -130,24 +130,7 @@ public class Evaluation { public String toString() { return new ToStringBuilder(this).append("operand", operand).append("field", field).append("value", value).append("datatype", datatype).append("lhs", lhs).append("rhs", rhs).append("additionalProperties", additionalProperties).toString(); } - /* - @Override - public int hashCode() { - return new HashCodeBuilder().append(field).append(additionalProperties).append(value).append(rhs).append(datatype).append(operand).append(lhs).toHashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if ((other instanceof Evaluation) == false) { - return false; - } - Evaluation rhs = ((Evaluation) other); - return new EqualsBuilder().append(field, rhs.field).append(additionalProperties, rhs.additionalProperties).append(value, rhs.value).append(rhs, rhs.rhs).append(datatype, rhs.datatype).append(operand, rhs.operand).append(lhs, rhs.lhs).isEquals(); - } -*/ + @Override public int hashCode() { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java index 1e6006a..43766ef 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java @@ -35,11 +35,6 @@ import org.springframework.stereotype.Component; @Component public class AdapterService { - /* - * @Autowired private UniversalEventAdapter snmpTrapEventAdapter; public - * GenericAdapter identifyIncomingJsonFormatAndReturnAdapter() { return - * snmpTrapEventAdapter; } - */ /** * Identifies eventype by parsing the incoming json file. @@ -52,7 +47,6 @@ public class AdapterService { */ public String identifyEventTypeFromIncomingJson(String incomingJsonString) throws MapperConfigException { - // TODO A proper logic to identify diffeent events is needed here return MapperConfigUtils.checkIncomingJsonForMatchingDomain(incomingJsonString); } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java deleted file mode 100644 index bf45a1b..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java +++ /dev/null @@ -1,41 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.service; - -import org.onap.universalvesadapter.exception.ConfigFileReadException; - -/** - * A contract defined for services that will handle the operations of config file. - * - * @author kmalbari - * - */ -public interface ConfigFileService { - - /** - * Returns the config file data. - * - * @param fileName file name - * @return config file content - * @throws ConfigFileReadException if unable to read config file - */ - String readConfigFile(String fileName) throws ConfigFileReadException; - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java index e463a28..04f333e 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java @@ -19,226 +19,96 @@ */ package org.onap.universalvesadapter.service; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.util.Collections; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration; +import org.onap.universalvesadapter.adapter.UniversalEventAdapter; +import org.onap.universalvesadapter.dmaap.Creator; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.exception.VesException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.MRPublisher.message; - -/** - * - * This service will handle all the communication with the DMaap MR API - * - * - * @author kmalbari - * - */ @Component public class DMaapService { - private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private DMaapMrUrlConfiguration dmaapMrUrlObject; - - private MRConsumer consumer; + private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + private static List<String> list = new LinkedList<String>(); + @Autowired + private UniversalEventAdapter eventAdapter; + + /** + * It fetches events from DMaap in JSON, transforms JSON to VES format and + * publishes it to outgoing DMaap MR Topic + * + * @param DMaaPMRSubscriber,DMaaPMRPublisher + * @return + */ + public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater) + throws InterruptedException { + LOGGER.info("fetch and publish from and to Dmaap started"); + + while (true) { + synchronized (this) { + for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) { + list.add(incomingJsonString); + + } + + if (list.size() == 0) { + Thread.sleep(2000); + } + LOGGER.debug("number of messages to be converted :{}", list.size()); + + if (list.size() != 0) { + String val = ((LinkedList<String>) list).removeFirst(); + List<String> messages = new ArrayList<>(); + String vesEvent = processReceivedJson(val); + if (!(vesEvent.isEmpty() || vesEvent.equals(null) || vesEvent.equals(""))) { + messages.add(vesEvent); + publisher.publish(messages); + LOGGER.info("Message successfully published to DMaaP Topic"); + } + + } + + } + } + } + + /** + * It finds mapping file for received json, transforms json to VES format + * + * @param incomingJsonString + * @return + */ + private String processReceivedJson(String incomingJsonString) { + String outgoingJsonString = null; + if (!"".equals(incomingJsonString)) { + + try { + /* + * For Future events String eventType = + * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); + * + * LOGGER.debug("Event identified as " + eventType); + */ + + outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp"); + + } catch (VesException exception) { + LOGGER.error("Received exception : " + exception.getMessage(), exception); + LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + } catch (DMaapException e) { + LOGGER.error("Received exception : ", e.getMessage()); + } + } + return outgoingJsonString; + } - private MRBatchingPublisher publisher; - - private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>(); - - /** - * Adds message to outgoing queue that will be sent to DMaaP topic. - * - * @param message outbound message in VES format - */ - public void addMessageInOutgoingQueue(String message) { - if (null != message && !"".equals(message)) { - outgoingMessageQueue.add(message); - eLOGGER.debug("Added message to outgoing queue " + message); - } - } - - - - /** - * reads the messages on DMaap MR Topic - * - * @return iterable of messages that will be received on DMaap MR Topic - * - * @throws DMaapException - */ - public Iterable<String> consumeFromDMaap() throws DMaapException{ - if(null == consumer){ - try { - consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); - eLOGGER.debug("Created consumer"); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception); - } - } - - try { - eLOGGER.debug("Returning result fetched by consumer"); - return consumer.fetch(); - } catch (Exception exception) { - throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception); - } -// return () -> Collections.emptyIterator(); - - } - - - /** - * sends the messages to DMaap MR Topic - * - * - * @throws DMaapException - */ - public void publishToDMaap() throws DMaapException{ - if(null == publisher){ - try { - publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - eLOGGER.debug("Create a publisher now."); - } catch (IOException exception) { - throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); - } - } - for(String message : outgoingMessageQueue){ - try { - publisher.send(message); - eLOGGER.debug("Sending message to DMaaP :-> " + message ); - } catch (IOException exception) { - throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); - } - } - List<message> stuck = null; - try { - stuck = publisher.close ( 20, TimeUnit.SECONDS ); - } catch (IOException | InterruptedException exception) { - throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); - } - if (null != stuck) { - if (stuck.size() > 0) { - eLOGGER.debug(stuck.size() + " messages unsent"); - } else { - eLOGGER.debug("Clean exit; all messages sent."); - } - } - else - throw new DMaapException("Problem while closing publisher, no messages were returned. "); - - } - - /** - * sends the messages to DMaap MR Topic - * - * - * @throws DMaapException - */ - public void publishToDMaap(String outgoingMessage) throws DMaapException{ - if(null == publisher){ - synchronized(publisher){ - if(null == publisher){ - try { - publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - eLOGGER.debug("Publisher created now."); - } catch (IOException exception) { - throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); - } - } - } - } - try { - publisher.send(outgoingMessage); - eLOGGER.debug("Sent outgoing message " + outgoingMessage); - } catch (IOException exception) { - throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); - } - List<message> stuck = null; - try { - stuck = publisher.close ( 20, TimeUnit.SECONDS ); - } catch (IOException | InterruptedException exception) { - throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); - } - if (null != stuck) { - if (stuck.size() > 0) { - eLOGGER.debug(stuck.size() + " messages unsent"); - } else { - eLOGGER.debug("Clean exit; all messages sent."); - } - } else{ - throw new DMaapException("Problem while closing publisher, no messages were returned. "); - } - - } - - - /** - * for local testing only - * @return - * @throws DMaapException - */ - /*public String consume() throws DMaapException { - - URL url; - StringBuffer incomingJson = null; - incomingJson = new StringBuffer(); - try { - url = new URL(dmaapMrUrlObject.getUrl()); - - //open the connection to the above URL. - URLConnection urlcon = url.openConnection(); - - Map<String, List<String>> header = urlcon.getHeaderFields(); - - //print all the fields along with their value. - for (Map.Entry<String, List<String>> mp : header.entrySet()) { - eLOGGER.debug(mp.getKey() + " : "); - eLOGGER.debug(mp.getValue().toString()); - } - eLOGGER.debug("Complete source code of the URL is-"); - eLOGGER.debug("---------------------------------"); - - //get the inputstream of the open connection. - BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream())); - String tempString; - //print the source code line by line. - while ((tempString = br.readLine()) != null) { - eLOGGER.debug(tempString); - incomingJson.append(tempString); - } - - } catch (MalformedURLException exception) { - throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception); - } catch (IOException exception) { - throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception); - } - return incomingJson.toString(); - }*/ - - - - - - - } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java deleted file mode 100644 index 7c05ced..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.service; - -import java.net.URI; -import java.net.URISyntaxException; - -import org.onap.universalvesadapter.configs.DiskRepoConfiguration; -import org.onap.universalvesadapter.exception.ConfigFileReadException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; - -/** - * Implementation of {@code ConfigFileService} using disk repository. - * - * @author kmalbari - * - */ -@Component -public class DiskRepoConfigFileService implements ConfigFileService { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private DiskRepoConfiguration diskRepoConfiguration; - - private RestTemplate restTemplate; - - private URI uri = null; - - /* (non-Javadoc) - * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String) - */ - @Override - public String readConfigFile(String fileName) throws ConfigFileReadException { - logger.debug("Reading config file for " + fileName); - if (null == uri) { - try { - uri = new URI(diskRepoConfiguration.getFileRepositoryUrl() + fileName); - logger.debug("Read URI for " + fileName); - } catch (URISyntaxException exception) { - throw new ConfigFileReadException("Unable to read config file for file " - + fileName + "\n Reason : " + exception.getMessage(), exception); - } - } - logger.debug("Calling file repo service for URI" + uri); - ResponseEntity<String> fileDataEntity = getRestTemplate().getForEntity(uri, String.class); - logger.debug("Call completed successfully"); - return fileDataEntity.getBody(); - } - - /** - * Instantiates the instance if null and returns it. - * - * @return {@code RestTemplate} instance - */ - private RestTemplate getRestTemplate(){ - - if (null == restTemplate) { - restTemplate = new RestTemplate(); - } - - return restTemplate; - } - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java deleted file mode 100644 index 77769f5..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java +++ /dev/null @@ -1,52 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.service; - -import org.onap.universalvesadapter.domain.ConfigFileData; -import org.springframework.stereotype.Component; - -/** - * Service to use mongo db as config file repository - * - * @author kmalbari - * - */ -@Component -public class MongoDbConfigFileService implements ConfigFileService { - - /* (non-Javadoc) - * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String) - */ - public String readConfigFile(String configFileName){ - //HERE CONFIG FILE DATA WOULD COME FROM MONGO DB - ConfigFileData configFileData = new ConfigFileData(); - configFileData.setXmlFileName(configFileName); - configFileData.setXmlContent("<?xml version=\"1.0\" encoding=\"UTF-8\"?> " - + "<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" " - + "xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" " - + " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> " - + " <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> " - + " </json:reader> " - + "</smooks-resource-list>"); - return configFileData.getXmlContent(); - } - - -} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java new file mode 100644 index 0000000..f92511e --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java @@ -0,0 +1,158 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.universalvesadapter.service; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.Ordered; +import org.springframework.stereotype.Component; + +//AdapterInitializer +@Component +public class VESAdapterInitializer implements CommandLineRunner, Ordered { + private static final Logger LOGGER = LoggerFactory.getLogger(VESAdapterInitializer.class); + @Value("${spring.datasource.url}") + String dBurl; + @Value("${spring.datasource.username}") + String user; + @Value("${spring.datasource.password}") + String pwd; + + private static Map<String, String> mappingFiles = new HashMap<String, String>(); + private static Map<String, String> env; + private static String url; + public static String retString; + public static String retCBSString; + + @Override + public void run(String... args) throws Exception { + + fetchMappingFile(); + getconsul(); + + } + + private void getconsul() { + + env = System.getenv(); + for (Map.Entry<String, String> entry : env.entrySet()) { + LOGGER.info(entry.getKey() + ":" + entry.getValue()); + } + + if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) { + LOGGER.info(">>>Dynamic configuration to be fetched from ConfigBindingService"); + url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE"); + + retString = executecurl(url); + + } else { + + + + LOGGER.info(">>>Static configuration to be used"); + + + } + + } + + private String executecurl(String url) { + + String[] command = { "curl", "-v", url }; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr); + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + LOGGER.info(result); + + reader.close(); + ipr.close(); + } catch (IOException e) { + LOGGER.error("error", e); + e.printStackTrace(); + } + return result; + + } + + public void fetchMappingFile() { + try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) { + LOGGER.info("Retrieving data from DB"); + PreparedStatement pstmt = con.prepareStatement("SELECT * FROM mapping_file"); + ResultSet rs = pstmt.executeQuery(); + // parsing the column each time is a linear search + int column1Pos = rs.findColumn("enterpriseid"); + int column2Pos = rs.findColumn("mappingfilecontents"); + String hexString; + while (rs.next()) { + String column1 = rs.getString(column1Pos); + String column2 = rs.getString(column2Pos); + hexString = column2.substring(2); + byte[] bytes = Hex.decodeHex(hexString.toCharArray()); + String data = new String(bytes, "UTF-8"); + mappingFiles.put(column1, data); + } + LOGGER.info("DB Initialization Completed..." + mappingFiles.size()); + } catch (Exception e) { + LOGGER.error("Error occured due to :" + e.getMessage()); + e.printStackTrace(); + } + + } + + public static Map<String, String> getMappingFiles() { + return mappingFiles; + } + + public static void setMappingFiles(Map<String, String> mappingFiles) { + VESAdapterInitializer.mappingFiles = mappingFiles; + } + + @Override + public int getOrder() { + return 1; + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index 112d1d6..ed590a8 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -19,28 +19,14 @@ */ package org.onap.universalvesadapter.service; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; - -import javax.annotation.Resource; - -import org.milyn.io.FileUtils; -import org.onap.universalvesadapter.adapter.GenericAdapter; -import org.onap.universalvesadapter.exception.ConfigFileReadException; -import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; -import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.dmaap.Creator; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; import org.onap.universalvesadapter.exception.MapperConfigException; -import org.onap.universalvesadapter.exception.VesException; -import org.onap.universalvesadapter.utils.MapperConfigUtils; -import org.onap.universalvesadapter.utils.ParallelTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; -import org.springframework.util.FileCopyUtils; /** * Service that starts the universal ves adapter module to listen for events @@ -50,128 +36,51 @@ import org.springframework.util.FileCopyUtils; */ @Component public class VesService { - - private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - private boolean isRunning = true; - - @Autowired - private ConfigurableApplicationContext ctx; - - @Autowired - private DMaapService dmaapService; - - @Autowired - private AdapterService adapterService; - - @Resource(name = "universalEventAdapter") - private GenericAdapter eventAdapter; - @Value("${messagesInBatch}") - private int messagesInBatch; + private final Logger LOGGER = LoggerFactory.getLogger(VesService.class); + + private boolean isRunning = true; + + @Autowired + private DMaapService dmaapService; + + @Autowired + private Creator creator; + - @Value("${messagesInTimeInterval}") - private long messagesInTimeInverval; - - @Value("${mapperConfig.file}") - private String mapperConfigFile; - - /*public void start(){ - - String incomingJsonString = dmaapService.consume(); - if(!"".equals(incomingJsonString)){ - GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter(); - String outgoingJsonString = eventAdapter.transform(incomingJsonString); - System.out.println(outgoingJsonString); - } - }*/ - - - /** - * method triggers universal ves adapter module. - */ - public void start() { - - try { - String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile)); - MapperConfigUtils.readMapperConfigFile(mappingConfigFileData); - - - ParallelTasks parallelTasks = new ParallelTasks(); - while (isRunning) { - int processingNumberOfMessage = 0; - long start = System.currentTimeMillis(); - for (String incomingJsonString : dmaapService.consumeFromDMaap()) { - parallelTasks.add(() -> processReceivedJson(incomingJsonString)); - processingNumberOfMessage++; - if (processingNumberOfMessage == messagesInBatch - || (System.currentTimeMillis() - start) > messagesInTimeInverval) { - processingNumberOfMessage = 0; - start = System.currentTimeMillis(); - try { - parallelTasks.startParallelTasks(); - } catch (InterruptedException exception) { - LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); - } - parallelTasks = new ParallelTasks(); - } - } - try { - parallelTasks.startParallelTasks(); - } catch (InterruptedException exception) { - LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); - } - parallelTasks = new ParallelTasks(); - } - - /*String incomingJsonString = ""; - incomingJsonString = dmaapService.consume(); - processReceivedJson(incomingJsonString);*/ - } catch (Exception exception) { - LOGGER.error("Reported exception : " + exception.getMessage(), exception); - } - } + /** + * method triggers universal VES adapter module. + */ + public void start() throws MapperConfigException { + LOGGER.debug("Creating Subcriber and Publisher with creator............."); + DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(); - /** - * It finds mapping file for received json, transforms json to VES format - * and publishes it to outgoing DMaap MR Topic - * - * @param incomingJsonString - */ - private void processReceivedJson(String incomingJsonString){ - LOGGER.debug("Received incoming message : " + incomingJsonString); - if (!"".equals(incomingJsonString)) { - String eventType; - try { - eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); - - LOGGER.debug("Event identified as " + eventType); - String outgoingJsonString; - outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType); - LOGGER.debug("Output VES json to be sent " + outgoingJsonString); - -// dmaapService.addMessageInOutgoingQueue(outgoingJsonString); -// LOGGER.debug("Added message in outgoing Queue "); - - dmaapService.publishToDMaap(outgoingJsonString); - LOGGER.debug("Sent message in outgoing Queue "); - - - } catch (VesException exception) { - LOGGER.error("Received exception : " + exception.getMessage(), exception); - - //TODO KKM : Do we wish to continue the application with same exception in every thread?? - LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); - ctx.close(); - } - } - } - - /** - * method stops universal ves adapter module - */ - public void stop() { - - isRunning = false; - } + DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(); + + // Create subscriber & publisher thread + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + try { + LOGGER.debug("starting subscriber & publisher thread:{}", Thread.currentThread().getName()); + dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + // Start subscriber & publisher thread + t1.setName("SNMP-COLLECTOR"); + t1.start(); + + } + + /** + * method stops universal ves adapter module + */ + public void stop() { + isRunning = false; + } } + diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java new file mode 100644 index 0000000..34bbc8e --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/DmaapConfig.java @@ -0,0 +1,288 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ +package org.onap.universalvesadapter.utils; + +import javax.validation.constraints.NotEmpty; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Component; + +@Component +@PropertySource(value = {"classpath:application.properties","classpath:DMaapMR.properties"}) +@ConfigurationProperties +public class DmaapConfig { + + @Value("${mr.hostname}") + @NotEmpty + private String hostname; + + // default port number + @Value("${mr.DEFAULT_PORT_NUMBER}") + @NotEmpty + private int DEFAULT_PORT_NUMBER; + + // default to no username + @Value("${mr.DEFAULT_USER_NAME}") + private String DEFAULT_USER_NAME; + + + //defaults to no userPassword + @Value("${mr.DEFAULT_USER_PASSWORD}") + private String DEFAULT_USER_PASSWORD; + + //defaults to using https protocol + @Value("${mr.DEFAULT_PROTOCOL}") + @NotEmpty + private String DEFAULT_PROTOCOL; + + //defaults to json content type + @Value("${mr.DEFAULT_CONTENT_TYPE}") + @NotEmpty + private String DEFAULT_CONTENT_TYPE; + + @Value("${mr.DMAAP_URI_PATH_PREFIX}") + @NotEmpty + private String DMAAP_URI_PATH_PREFIX; + + @Value("${mr.DMAAP_DEFAULT_CONSUMER_ID}") + @NotEmpty + private String DMAAP_DEFAULT_CONSUMER_ID; + + @Value("${mr.DMAAP_GROUP_PREFIX}") + @NotEmpty + private String DMAAP_GROUP_PREFIX; + + // Publisher Constants + + //Dmaap Publisher Topic + @Value("${mr.publisher.topic}") + @NotEmpty + private String publisherTopic; + + //disable batching by default + @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_BATCH_SIZE}") + @NotEmpty + private int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + + //default recovery messages size + @Value("${mr.publisher.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE}") + @NotEmpty + private int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + +//number of retries when flushing messages + @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}") + @NotEmpty + private int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + + //delay in retrying for flushing messages + @Value("${mr.publisher.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE}") + @NotEmpty + private int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + + // Subscriber Constants + + //Dmaap Subcriber Topic + @Value("${mr.subscriber.topic}") + @NotEmpty + private String subscriberTopic; + + @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_TIMEOUT_MS}") + @NotEmpty + private int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + + @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT}") + @NotEmpty + private int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + + @Value("${mr.subcriber.DEFAULT_SUBSCRIBER_GROUP_PREFIX}") + @NotEmpty + private String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + + @Value("${mr.subcriber.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME}") + @NotEmpty + private String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + + @Value("${mr.subcriber.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME}") + @NotEmpty + private String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public String getHostname() { + return hostname; + } + + public int getDEFAULT_PORT_NUMBER() { + return DEFAULT_PORT_NUMBER; + } + + public void setDEFAULT_PORT_NUMBER(int dEFAULT_PORT_NUMBER) { + DEFAULT_PORT_NUMBER = dEFAULT_PORT_NUMBER; + } + + public String getDEFAULT_USER_NAME() { + return DEFAULT_USER_NAME; + } + + public void setDEFAULT_USER_NAME(String dEFAULT_USER_NAME) { + DEFAULT_USER_NAME = dEFAULT_USER_NAME; + } + + public String getDEFAULT_USER_PASSWORD() { + return DEFAULT_USER_PASSWORD; + } + + public void setDEFAULT_USER_PASSWORD(String dEFAULT_USER_PASSWORD) { + DEFAULT_USER_PASSWORD = dEFAULT_USER_PASSWORD; + } + + public String getDEFAULT_PROTOCOL() { + return DEFAULT_PROTOCOL; + } + + public void setDEFAULT_PROTOCOL(String dEFAULT_PROTOCOL) { + DEFAULT_PROTOCOL = dEFAULT_PROTOCOL; + } + + public String getDEFAULT_CONTENT_TYPE() { + return DEFAULT_CONTENT_TYPE; + } + + public void setDEFAULT_CONTENT_TYPE(String dEFAULT_CONTENT_TYPE) { + DEFAULT_CONTENT_TYPE = dEFAULT_CONTENT_TYPE; + } + + public String getDMAAP_URI_PATH_PREFIX() { + return DMAAP_URI_PATH_PREFIX; + } + + public void setDMAAP_URI_PATH_PREFIX(String dMAAP_URI_PATH_PREFIX) { + DMAAP_URI_PATH_PREFIX = dMAAP_URI_PATH_PREFIX; + } + + public String getDMAAP_DEFAULT_CONSUMER_ID() { + return DMAAP_DEFAULT_CONSUMER_ID; + } + + public void setDMAAP_DEFAULT_CONSUMER_ID(String dMAAP_DEFAULT_CONSUMER_ID) { + DMAAP_DEFAULT_CONSUMER_ID = dMAAP_DEFAULT_CONSUMER_ID; + } + + public String getDMAAP_GROUP_PREFIX() { + return DMAAP_GROUP_PREFIX; + } + + public void setDMAAP_GROUP_PREFIX(String dMAAP_GROUP_PREFIX) { + DMAAP_GROUP_PREFIX = dMAAP_GROUP_PREFIX; + } + + public String getPublisherTopic() { + return publisherTopic; + } + + public void setPublisherTopic(String publisherTopic) { + this.publisherTopic = publisherTopic; + } + + public int getPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE() { + return publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + } + + public void setPublisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE(int publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE) { + this.publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE = publisherDEFAULT_PUBLISHER_MAX_BATCH_SIZE; + } + + public int getPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE() { + return publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + } + + public void setPublisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE( + int publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) { + this.publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = publisherDEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + } + + public int getPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE() { + return publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + } + + public void setPublisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE(int publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE) { + this.publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE = publisherPUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE; + } + + public int getPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE() { + return publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + } + + public void setPublisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE(int publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE) { + this.publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE = publisherPUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE; + } + + public String getSubscriberTopic() { + return subscriberTopic; + } + + public void setSubscriberTopic(String subscriberTopic) { + this.subscriberTopic = subscriberTopic; + } + + public int getSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS() { + return subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + } + + public void setSubcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS(int subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS) { + this.subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS = subcriberDEFAULT_SUBSCRIBER_TIMEOUT_MS; + } + + public int getSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT() { + return subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + } + + public void setSubcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT(int subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT) { + this.subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT = subcriberDEFAULT_SUBSCRIBER_MESSAGE_LIMIT; + } + + public String getSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX() { + return subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + } + + public void setSubcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX(String subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX) { + this.subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX = subcriberDEFAULT_SUBSCRIBER_GROUP_PREFIX; + } + + public String getSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME() { + return subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + } + + public void setSubcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME) { + this.subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME; + } + + public String getSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME() { + return subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + } + + public void setSubcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME(String subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME) { + this.subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME = subcriberSUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME; + } + +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java new file mode 100644 index 0000000..02632f4 --- /dev/null +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/HTTPUtils.java @@ -0,0 +1,62 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.universalvesadapter.utils; + +/** + * Contains common utils to check HTTP Related Utils + * + * @author Rajiv Singla . Creation Date: 11/2/2016. + */ +public abstract class HTTPUtils { + + /** + * HTTP Status code for successful HTTP call + */ + public static final Integer HTTP_SUCCESS_STATUS_CODE = 200; + + /** + * HTTP Response code when request has been accepted for processing, but the processing has not been completed + */ + public static final Integer HTTP_ACCEPTED_RESPONSE_CODE = 202; + + /** + * HTTP Response code when there is no content + */ + public static final Integer HTTP_NO_CONTENT_RESPONSE_CODE = 204; + + + public static final String JSON_APPLICATION_TYPE = "application/json"; + + + private HTTPUtils() { + + } + + /** + * Checks if HTTP Status code is less than or equal to 200 but less then 300 + * + * @param statusCode http status code + * @return true if response code between 200 and 300 + */ + public static boolean isSuccessfulResponseCode(Integer statusCode) { + return statusCode >= 200 && statusCode < 300; + } +} diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java index 4e67ed6..3d1907a 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/MapperConfigUtils.java @@ -19,18 +19,22 @@ */ package org.onap.universalvesadapter.utils; -import com.att.aft.dme2.internal.apache.commons.lang3.EnumUtils; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Set; import java.util.TreeSet; + import org.onap.universalvesadapter.exception.MapperConfigException; import org.onap.universalvesadapter.mappingconfig.Entry; import org.onap.universalvesadapter.mappingconfig.Evaluation; import org.onap.universalvesadapter.mappingconfig.MapperConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; /** + * This class will be used in Future for different telemetry data: * This class will be utility class to read the mapper config file and parse the * config to prepare the grammar to detect the incoming json's event type. * @@ -39,6 +43,7 @@ import org.onap.universalvesadapter.mappingconfig.MapperConfig; */ public class MapperConfigUtils { + private final Logger LOGGER = LoggerFactory.getLogger(MapperConfigUtils.class); private static Set<Entry> entries = new TreeSet<>((o1, o2) -> o1.getPriority().compareTo(o2.getPriority())); private enum JoinOperator { @@ -212,6 +217,7 @@ public class MapperConfigUtils { exception); } System.out.println("Read config file content into :" + config); + if (null != config) { entries.addAll(config.getEntries()); } else { diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java deleted file mode 100644 index 45fdf96..0000000 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/ParallelTasks.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DCAE -* ================================================================================ -* Copyright 2018 TechMahindra -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.universalvesadapter.utils; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * - * Utility class to execute parallel tasks - * - * @author kmalbari - * - */ -public class ParallelTasks -{ - private final Collection<Runnable> tasks = new ArrayList<Runnable>(); - - public ParallelTasks() - { - } - - /** - * - * Add task to be executed in parallel - * - * @param task - */ - public void add(final Runnable task) - { - tasks.add(task); - } - - /** - * starts all the added tasks in parallel - * - * @throws InterruptedException - */ - public void startParallelTasks() throws InterruptedException - { - final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() - .availableProcessors()); - try - { - final CountDownLatch latch = new CountDownLatch(tasks.size()); - for (final Runnable task : tasks) - threads.execute(new Runnable() { - public void run() - { - try - { - task.run(); - } - finally - { - latch.countDown(); - } - } - }); - latch.await(); - } - finally - { - threads.shutdown(); - } - } -}
\ No newline at end of file diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java index 8c17dc2..1c38783 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/SmooksUtils.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; * @author kmalbari * */ + + public class SmooksUtils { @@ -53,23 +55,19 @@ public class SmooksUtils { */ public static VesEvent getTransformedObjectForInput(Smooks smooks, String incomingJsonString) { - LOGGER.debug("Transforming json " + incomingJsonString); + LOGGER.info("Transforming incoming json " ); ExecutionContext executionContext = smooks.createExecutionContext(); - LOGGER.debug("Context created"); + LOGGER.info("Context created"); Locale defaultLocale = Locale.getDefault(); Locale.setDefault(new Locale("en", "IE")); StringResult result = new StringResult(); - // Configure the execution context to generate a report... -// executionContext.setEventListener(new HtmlReportGenerator("target/report/report.html")); - - // Filter the input message to the outputWriter, using the execution context... smooks.filterSource(executionContext, new StreamSource(new ByteArrayInputStream(incomingJsonString.getBytes(StandardCharsets.UTF_8))), result); - LOGGER.debug("Transformed incoming json now"); + Locale.setDefault(defaultLocale); VesEvent vesEvent = (VesEvent) executionContext.getBeanContext().getBean("vesEvent"); - LOGGER.debug("Converted vesEvent from incoming json"); + LOGGER.debug("consversion successful to VES Event"); return vesEvent; } |