diff options
Diffstat (limited to 'dcae-analytics-dmaap/src')
39 files changed, 4238 insertions, 0 deletions
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactory.java new file mode 100644 index 0000000..6966a46 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactory.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.module.AnalyticsDMaaPModule; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherFactory; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriberFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +/** + * Creates pre injected implementations for {@link DMaaPMRPublisher} and {@link DMaaPMRSubscriber} + * <p> + * Usage: + * <p>Create an instance of DMaaP MR Factory</p> + * <pre> + * DMaaPFactory dmaapFactory = DMaaPFactory.initalize() + * </pre> + * <p>Create a new DMaaP MR Publisher</p> + * <pre> + * DMaaPMRPublisher publisher = dmaapFactory.createPublisher(publisherConfig) + * </pre> + * <p>Create new DMaaP MR Subscriber</p> + * <pre> + * DMaaPMRSubscriber subscriber = dmaapFactory.createSubscriber(subscriberConfig) + * </pre> + * <p> + * <strong>All Clients must use this Factory to initalize DMaaP Message Router Publishers and Subscribers</strong> + * </p> + * <p> + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +public class DMaaPMRFactory { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRFactory.class); + + private final Injector injector; + + public DMaaPMRFactory(AbstractModule guiceModule) { + injector = Guice.createInjector(guiceModule); + } + + /** + * Returns configured instance of {@link DMaaPMRPublisher} + * + * @param publisherConfig Publisher Config + * @return configured instance of DMaaP MR Publisher + */ + public DMaaPMRPublisher createPublisher(@Nonnull DMaaPMRPublisherConfig publisherConfig) { + final DMaaPMRPublisherFactory publisherFactory = injector.getInstance(DMaaPMRPublisherFactory.class); + LOG.debug("Creating new DMaaP MR Publisher Instance with configuration: {}", publisherConfig); + final DMaaPMRPublisher dMaaPMRPublisher = publisherFactory.create(publisherConfig); + LOG.info("Created new DMaaP MR Publisher Instance. Publisher creation time: {}", + dMaaPMRPublisher.getPublisherCreationTime()); + return dMaaPMRPublisher; + } + + /** + * Returns configured instance of {@link DMaaPMRSubscriber} + * + * @param subscriberConfig Subscriber Config + * @return configured instance of DMaaP MR Subscriber + */ + public DMaaPMRSubscriber createSubscriber(@Nonnull DMaaPMRSubscriberConfig subscriberConfig) { + final DMaaPMRSubscriberFactory subscriberFactory = injector.getInstance(DMaaPMRSubscriberFactory.class); + LOG.debug("Creating new DMaaP MR Subscriber Instance with configuration: {}", subscriberConfig); + final DMaaPMRSubscriber dMaaPMRSubscriber = subscriberFactory.create(subscriberConfig); + LOG.info("Created new DMaaP MR Subscriber Instance. Subscriber creation time: {}", + dMaaPMRSubscriber.getSubscriberCreationTime()); + return dMaaPMRSubscriber; + } + + /** + * Creates an instance of {@link DMaaPMRFactory} + * + * @return {@link DMaaPMRFactory} factory instance + */ + public static DMaaPMRFactory create() { + final DMaaPMRFactory dMaaPMRFactory = new DMaaPMRFactory(new AnalyticsDMaaPModule()); + LOG.info("Created new instance of DMaaP MR Factory"); + return dMaaPMRFactory; + } + + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRBaseConfig.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRBaseConfig.java new file mode 100644 index 0000000..f2435f1 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRBaseConfig.java @@ -0,0 +1,193 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +import com.google.common.base.Objects; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; + +import static org.openecomp.dcae.analytics.common.utils.HTTPUtils.JSON_APPLICATION_TYPE; + +/** + * <p> + * Contains common parameters for both DMaaP Message Router Publisher and Subscriber Configs + * <p> + * @author Rajiv Singla. Creation Date: 10/12/2016. + */ +public abstract class DMaaPMRBaseConfig implements DMaaPMRConfig { + + protected static final Logger LOG = LoggerFactory.getLogger(DMaaPMRBaseConfig.class); + + protected String hostName; + protected Integer portNumber; + protected String topicName; + protected String protocol; + protected String userName; + protected String userPassword; + protected String contentType; + + /** + * Provides host name e.g. mrlocal-mtnjftle01.homer.com + * + * @return host name + */ + public String getHostName() { + return hostName; + } + + + /** + * Provides Port Number of DMaaP MR Topic Host. Defaults to 80 + * + * @return host port number + */ + public Integer getPortNumber() { + return portNumber; + } + + /** + * Provides topic name + * + * @return topic name + */ + public String getTopicName() { + return topicName; + } + + /** + * Provides protocol type e.g. http or https + * + * @return protocol type + */ + public String getProtocol() { + return protocol; + } + + /** + * Provides content type e.g. application/json + * + * @return content type + */ + public String getContentType() { + return contentType; + } + + + /** + * Provides User name for the DMaaP MR Topic authentication + * + * @return user name + */ + public String getUserName() { + return userName; + } + + /** + * Provides User password for the DMaaP MR Topic authentication + * + * @return user Password + */ + public String getUserPassword() { + return userPassword; + } + + + /** + * Trims, adjusts casing and validates user input String for protocol selection + * + * @param protocol - User input for protocol String + * @return - network protocol e.g http or https + */ + protected static String normalizeValidateProtocol(final String protocol) { + // validate that only http and https are supported protocols are Supported for DMaaP MR + String normalizedProtocolString = protocol.trim().toLowerCase(Locale.ENGLISH); + if (normalizedProtocolString.isEmpty() || + !(normalizedProtocolString.equals("http") || normalizedProtocolString.equals("https"))) { + + final String errorMessage = + "Unsupported protocol selection. Only HTTPS and HTTPS are currently supported for DMaaP MR"; + + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return normalizedProtocolString; + } + + + /** + * Trims, adjust casing and validates content type is supported by DMaaP. + * + * NOTE: DMaaP currently only support application/json content type + * + * @param contentType content type that needs to checked for DMaaP MR support + * @return true if content type is supported by DMaaP MR + */ + protected static String normalizeValidateContentType(final String contentType) { + // Current DMaaP MR is only supporting "application/json" content type + String normalizedContentType = contentType.trim().toLowerCase(Locale.ENGLISH); + final boolean isSupported = contentType.equals(JSON_APPLICATION_TYPE); + if (!isSupported) { + final String errorMessage = + "Unsupported content type selection. Only application/json is currently supported for DMaaP MR"; + + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return normalizedContentType; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DMaaPMRBaseConfig)) { + return false; + } + DMaaPMRBaseConfig that = (DMaaPMRBaseConfig) o; + return Objects.equal(hostName, that.hostName) && + Objects.equal(portNumber, that.portNumber) && + Objects.equal(topicName, that.topicName) && + Objects.equal(protocol, that.protocol) && + Objects.equal(userName, that.userName) && + Objects.equal(userPassword, that.userPassword) && + Objects.equal(contentType, that.contentType); + } + + @Override + public int hashCode() { + return Objects.hashCode(hostName, portNumber, topicName, protocol, userName, userPassword, contentType); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("hostName", hostName) + .add("portNumber", portNumber) + .add("topicName", topicName) + .add("protocol", protocol) + .add("userName", userName) + .add("contentType", contentType) + .toString(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRConfig.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRConfig.java new file mode 100644 index 0000000..f5ef71e --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRConfig.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +/** + * <p> + * Marker Interface for all DMaaP MR Configs. + * Holds Default configuration parameters for all Configs + * <p> + * @author Rajiv Singla. Creation Date: 10/12/2016. + */ +public interface DMaaPMRConfig { + + Integer DEFAULT_PORT_NUMBER = 80; // default port number + String DEFAULT_USER_NAME = null; // default to no username + String DEFAULT_USER_PASSWORD = null; // defaults to no userPassword + String DEFAULT_PROTOCOL = "https"; // defaults to using https protocol + String DEFAULT_CONTENT_TYPE = "application/json"; // defaults to json content type + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfig.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfig.java new file mode 100644 index 0000000..4a08cd3 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfig.java @@ -0,0 +1,250 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +import com.google.common.base.Objects; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + +/** + * <p> + * Immutable DMaaP MR Configuration for DMaaP MR Publisher. + * <p> + * Use {@link DMaaPMRPublisherConfig.Builder} to construct Subscriber Configuration + * </p> + * <p> + * @author Rajiv Singla. Creation Date: 10/12/2016. + */ +public class DMaaPMRPublisherConfig extends DMaaPMRBaseConfig { + + /** + * Publisher batching queue size + */ + private int maxBatchSize; + + /** + * Publisher Recovery Queue Size + */ + private int maxRecoveryQueueSize; + + + private DMaaPMRPublisherConfig(@Nonnull String hostName, + @Nonnull Integer portNumber, + @Nonnull String topicName, + @Nonnull String protocol, + @Nonnull String userName, + @Nonnull String userPassword, + @Nonnull String contentType, + int maxBatchSize, + int maxRecoveryQueueSize) { + this.hostName = hostName; + this.portNumber = portNumber; + this.topicName = topicName; + this.protocol = protocol; + this.userName = userName; + this.userPassword = userPassword; + this.contentType = contentType; + this.maxBatchSize = maxBatchSize; + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + } + + + /** + * Builder to initialize immutable {@link DMaaPMRPublisherConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private int maxBatchSize; + private int maxRecoveryQueueSize; + + public Builder(@Nonnull String hostName, @Nonnull String topicName) { + // required values + this.hostName = hostName; + this.topicName = topicName; + // Default values + this.portNumber = DEFAULT_PORT_NUMBER; + this.userName = DEFAULT_USER_NAME; + this.userPassword = DEFAULT_USER_PASSWORD; + this.protocol = DEFAULT_PROTOCOL; + this.contentType = DEFAULT_CONTENT_TYPE; + this.maxBatchSize = DEFAULT_PUBLISHER_MAX_BATCH_SIZE; + this.maxRecoveryQueueSize = DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + } + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + + /** + * Setup user name for authentication. If no username is provided authentication will be disabled + * + * @param userName user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + + /** + * Setup user password for authentication. If no password is provided authentication will be disabled + * + * @param userPassword user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + + /** + * Setup custom Publisher protocol - Defaults to https. + * Note: Only http and https are currently supported. + * + * @param protocol protocol e.g. https + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + + /** + * Setup custom Publisher content-type - Defaults to application/json + * + * @param contentType content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + + /** + * Setup custom Publisher Max Batch Size - Defaults to 100 + * + * @param maxBatchSize max Batch Size + * @return Builder object itself for chaining + */ + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + + /** + * Setup custom Maximum Recovery Queue Size. Recovery Queue is used to hold messages temporarily in case + * DMaaP MR Publisher topic is not responding for any reason. Defaults to 100,000 + * + * @param maxRecoveryQueueSize max recovery queue size + * @return Builder object itself for chaining + */ + public Builder setMaxRecoveryQueueSize(int maxRecoveryQueueSize) { + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + return this; + } + + /** + * Creates immutable instance of {@link DMaaPMRPublisherConfig} + * + * @return Builds and returns thread safe, immutable {@link DMaaPMRPublisherConfig} object + */ + public DMaaPMRPublisherConfig build() { + return new DMaaPMRPublisherConfig(hostName, portNumber, topicName, protocol, userName, userPassword, + contentType, maxBatchSize, maxRecoveryQueueSize); + } + + } + + + /** + * Returns max Publisher Batch Queue Size + * + * @return max Publisher Batch Queue size + */ + public int getMaxBatchSize() { + return maxBatchSize; + } + + /** + * Returns max Publisher Recovery Queue Size + * + * @return max Recovery Queue size + */ + public int getMaxRecoveryQueueSize() { + return maxRecoveryQueueSize; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRPublisherConfig that = (DMaaPMRPublisherConfig) o; + return maxBatchSize == that.maxBatchSize && + maxRecoveryQueueSize == that.maxRecoveryQueueSize; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), maxBatchSize, maxRecoveryQueueSize); + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("baseConfig", super.toString()) + .add("maxBatchSize", maxBatchSize) + .add("maxRecoveryQueueSize", maxRecoveryQueueSize) + .toString(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfig.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfig.java new file mode 100644 index 0000000..9741fa5 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfig.java @@ -0,0 +1,298 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +import com.google.common.base.Objects; + +import java.util.UUID; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_GROUP_PREFIX; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_TIMEOUT_MS; + +/** + * <p> + * Immutable DMaaP MR Configuration for Subscriber. + * <p> + * Use {@link DMaaPMRSubscriberConfig.Builder} to construct Subscriber Configuration + * <p> + * + * @author Rajiv Singla. Creation Date: 10/12/2016. + */ +public final class DMaaPMRSubscriberConfig extends DMaaPMRBaseConfig { + + private final String consumerId; + private final String consumerGroup; + private final Integer timeoutMS; + private final Integer messageLimit; + + private DMaaPMRSubscriberConfig(@Nonnull String hostName, + @Nonnull Integer portNumber, + @Nonnull String topicName, + @Nonnull String protocol, + @Nonnull String userName, + @Nonnull String userPassword, + @Nonnull String contentType, + @Nonnull String consumerId, + @Nonnull String consumerGroup, + @Nonnull Integer timeoutMS, + @Nonnull Integer messageLimit) { + this.hostName = hostName; + this.portNumber = portNumber; + this.topicName = topicName; + this.protocol = protocol; + this.userName = userName; + this.userPassword = userPassword; + this.contentType = contentType; + this.consumerId = consumerId; + this.consumerGroup = consumerGroup; + this.timeoutMS = timeoutMS; + this.messageLimit = messageLimit; + } + + /** + * Builder to initialize immutable {@link DMaaPMRSubscriberConfig} object + */ + public static class Builder { + + private String hostName; + private Integer portNumber; + private String topicName; + private String userName; + private String userPassword; + private String protocol; + private String contentType; + private String consumerId; + private String consumerGroup; + private Integer timeoutMS; + private Integer messageLimit; + + public Builder(@Nonnull String hostName, + @Nonnull String topicName) { + // Required Values + this.hostName = hostName; + this.topicName = topicName; + + // Default values + this.portNumber = DEFAULT_PORT_NUMBER; + this.userName = DEFAULT_USER_NAME; + this.userPassword = DEFAULT_USER_PASSWORD; + this.protocol = DEFAULT_PROTOCOL; + this.contentType = DEFAULT_CONTENT_TYPE; + this.consumerId = UUID.randomUUID().toString(); // consumer is assigned a random id by default + this.consumerGroup = DEFAULT_SUBSCRIBER_GROUP_PREFIX + consumerId; // random group is assigned + this.timeoutMS = DEFAULT_SUBSCRIBER_TIMEOUT_MS; // defaults to 10ms timeout + this.messageLimit = DEFAULT_SUBSCRIBER_MESSAGE_LIMIT; // defaults to 1000 message limit + } + + + /** + * Setup for custom host port number - Defaults to 80. + * + * @param portNumber custom port number + * @return Builder object itself for chaining + */ + public Builder setPortNumber(@Nonnull Integer portNumber) { + this.portNumber = portNumber; + return this; + } + + + /** + * Setup user name for authentication. If no username is provided authentication will be disabled + * + * @param userName user name for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserName(@Nonnull String userName) { + this.userName = userName; + return this; + } + + + /** + * Setup user password for authentication. If no password is provided authentication will be disabled + * + * @param userPassword user password for DMaaP Topic Authentication + * @return Builder object itself for chaining + */ + public Builder setUserPassword(@Nonnull String userPassword) { + this.userPassword = userPassword; + return this; + } + + + /** + * Setup custom Subscriber protocol - Defaults to https. + * Note: Only http and https are currently supported. + * + * @param protocol protocol e.g. https or http + * @return Builder object itself for chaining + */ + public Builder setProtocol(@Nonnull String protocol) { + + this.protocol = normalizeValidateProtocol(protocol); + return this; + } + + /** + * Setup custom Subscriber content-type - Defaults to application/json + * + * @param contentType content type e.g. application/json + * @return Builder object itself for chaining + */ + public Builder setContentType(@Nonnull String contentType) { + final String normalizedContentType = normalizeValidateContentType(contentType); + this.contentType = normalizedContentType; + return this; + } + + + /** + * Setup custom Consumer Id - Defaults to random Id + * + * @param consumerId - custom consumer ID + * @return Builder object itself for chaining + */ + public Builder setConsumerId(@Nonnull String consumerId) { + this.consumerId = consumerId; + return this; + } + + /** + * Setup custom Consumer Group - Default to OpenDCAE-DMaaPSub-ConsumerID + * + * @param consumerGroup - custom Consumer Group + * @return Builder object itself for chaining + */ + public Builder setConsumerGroup(@Nonnull String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + /** + * Setup Custom Subscriber timeout in ms - Default to no timeout limit + * + * @param timeoutMS timeout in milliseconds + * @return Builder object itself for chaining + */ + public Builder setTimeoutMS(@Nonnull Integer timeoutMS) { + this.timeoutMS = timeoutMS; + return this; + } + + /** + * Setup custom Subscriber Message Limit - Default to no limit + * + * @param messageLimit message Limit + * @return Builder object itself for chaining + */ + public Builder setMessageLimit(@Nonnull Integer messageLimit) { + this.messageLimit = messageLimit; + return this; + } + + /** + * Builds Immutable instance of {@link DMaaPMRSubscriberConfig} + * + * @return immutable DMaaP Subscriber Config Object + */ + public DMaaPMRSubscriberConfig build() { + return new DMaaPMRSubscriberConfig(hostName, portNumber, topicName, protocol, userName, userPassword, + contentType, consumerId, consumerGroup, timeoutMS, messageLimit); + } + + } + + + /** + * DMaaP MR Subscriber Consumer Id + * + * @return consumer Id + */ + public String getConsumerId() { + return consumerId; + } + + /** + * DMaaP MR Subscriber Consumer Group + * + * @return consumer group + */ + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * DMaaP MR Subscriber Timeout in ms + * + * @return subscriber timeout ms + */ + public Integer getTimeoutMS() { + return timeoutMS; + } + + /** + * DMaaP MR Subscriber message limit + * + * @return subscriber message limit + */ + public Integer getMessageLimit() { + return messageLimit; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DMaaPMRSubscriberConfig that = (DMaaPMRSubscriberConfig) o; + return Objects.equal(consumerId, that.consumerId) && + Objects.equal(consumerGroup, that.consumerGroup) && + Objects.equal(timeoutMS, that.timeoutMS) && + Objects.equal(messageLimit, that.messageLimit); + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), consumerId, consumerGroup, timeoutMS, messageLimit); + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("baseConfig", super.toString()) + .add("consumerId", consumerId) + .add("consumerGroup", consumerGroup) + .add("timeoutMS", timeoutMS) + .add("messageLimit", messageLimit) + .toString(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponse.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponse.java new file mode 100644 index 0000000..ad6a86f --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponse.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.response; + +/** + * <p> + * Contract for all DMaaPMR Publisher Response + * <p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public interface DMaaPMRPublisherResponse extends DMaaPMRResponse { + + + /** + * Gets number of pending messages + * + * @return pending messages in the batch queue + */ + int getPendingMessagesCount(); +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponseImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponseImpl.java new file mode 100644 index 0000000..dd1dfe8 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRPublisherResponseImpl.java @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.response; + +import com.google.common.base.Objects; + +import javax.annotation.Nonnull; + +/** + * <p> + * An simple implementation of {@link DMaaPMRPublisherResponse} + * <p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRPublisherResponseImpl implements DMaaPMRPublisherResponse { + + private final Integer responseCode; + private final String responseMessage; + private final int pendingMessagesCount; + + public DMaaPMRPublisherResponseImpl(@Nonnull Integer responseCode, + @Nonnull String responseMessage, + int pendingMessagesCount) { + this.responseCode = responseCode; + this.responseMessage = responseMessage; + this.pendingMessagesCount = pendingMessagesCount; + } + + @Override + public Integer getResponseCode() { + return responseCode; + } + + @Override + public String getResponseMessage() { + return responseMessage; + } + + @Override + public int getPendingMessagesCount() { + return pendingMessagesCount; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("responseCode", responseCode) + .add("responseMessage", responseMessage) + .add("pendingMessagesCount", pendingMessagesCount) + .toString(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRResponse.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRResponse.java new file mode 100644 index 0000000..7dc7523 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRResponse.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.response; + +/** + * <p> + * Contract for DMaaP MR Responses + * <p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public interface DMaaPMRResponse { + + /** + * Gets HTTP Response Code + * + * @return HTTP Response code as String + */ + Integer getResponseCode(); + + /** + * Gets Response Message + * + * @return Response Message + */ + String getResponseMessage(); + + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponse.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponse.java new file mode 100644 index 0000000..fd1a9e3 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponse.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.response; + +import java.util.List; + +/** + * <p> + * Contract for all DMaaP MR Subscriber Responses + * </p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public interface DMaaPMRSubscriberResponse extends DMaaPMRResponse { + + /** + * Returns message fetched from DMaaP MR Topic + * + * @return collection of actual message retrieved from DMaaP MR Topic + */ + List<String> getFetchedMessages(); + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponseImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponseImpl.java new file mode 100644 index 0000000..1254654 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/domain/response/DMaaPMRSubscriberResponseImpl.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.response; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static java.util.Collections.unmodifiableList; + +/** + * <p> + * A simple implementation for {@link DMaaPMRSubscriberResponse} + * <p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRSubscriberResponseImpl implements DMaaPMRSubscriberResponse { + + private final Integer responseCode; + private final String responseMessage; + private final List<String> fetchedMessages; + + public DMaaPMRSubscriberResponseImpl(@Nonnull Integer responseCode, + @Nonnull String responseMessage, + @Nullable List<String> fetchedMessages) { + this.responseCode = responseCode; + this.responseMessage = responseMessage; + this.fetchedMessages = fetchedMessages != null ? fetchedMessages : ImmutableList.<String>of(); + } + + public DMaaPMRSubscriberResponseImpl(Integer responseCode, String responseMessage) { + this(responseCode, responseMessage, null); + } + + @Override + public Integer getResponseCode() { + return responseCode; + } + + @Override + public String getResponseMessage() { + return responseMessage; + } + + @Override + public List<String> getFetchedMessages() { + return unmodifiableList(fetchedMessages); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("responseCode", responseCode) + .add("responseMessage", responseMessage) + .add("fetchedMessages(size)", fetchedMessages.size()) + .toString(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPModule.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPModule.java new file mode 100644 index 0000000..1296798 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPModule.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.module; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherFactory; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherImpl; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueueFactory; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueueImpl; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriberFactory; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriberImpl; + +/** + * Guice Module to wire concrete implementations with interfaces + * <p> + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +public class AnalyticsDMaaPModule extends AbstractModule { + + + @Override + protected void configure() { + + // Bind Http Client + bind(CloseableHttpClient.class).toInstance(HttpClients.createDefault()); + + // Bind Publishing queue + install(new FactoryModuleBuilder().implement(DMaaPMRPublisherQueue.class, DMaaPMRPublisherQueueImpl.class) + .build(DMaaPMRPublisherQueueFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRPublisher.class, DMaaPMRPublisherImpl.class) + .build(DMaaPMRPublisherFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRSubscriber.class, DMaaPMRSubscriberImpl.class) + .build(DMaaPMRSubscriberFactory.class)); + + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponent.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponent.java new file mode 100644 index 0000000..e7e8ea4 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponent.java @@ -0,0 +1,356 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.dcae.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.common.utils.HTTPUtils; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.LinkedList; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static java.lang.String.format; + +/** + * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods + * + * @author Rajiv Singla. Creation Date: 11/1/2016. + */ +public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent { + + private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class); + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates Base64 encoded Auth Header for given userName and Password + * If either user name of password are null return absent + * + * @param userName username + * @param userPassword user password + * @return base64 encoded auth header if username or password are both non null + */ + protected static Optional<String> getAuthHeader(final @Nullable String userName, + final @Nullable String userPassword) { + if (userName == null || userPassword == null) { + return Optional.absent(); + } else { + final String auth = userName + ":" + userPassword; + final Charset isoCharset = Charset.forName("ISO-8859-1"); + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset)); + return Optional.of("Basic " + new String(encodedAuth, isoCharset)); + } + } + + + /** + * Creates Publisher URI for given {@link DMaaPMRPublisherConfig} + * + * @param publisherConfig publisher settings + * + * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic + */ + protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) { + final String hostName = publisherConfig.getHostName(); + final Integer portNumber = publisherConfig.getPortNumber(); + final String getProtocol = publisherConfig.getProtocol(); + final String topicName = publisherConfig.getTopicName(); + URI publisherURI = null; + try { + publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) + .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build(); + } catch (URISyntaxException e) { + final String errorMessage = format("Error while creating publisher URI: %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI); + return publisherURI; + } + + + /** + * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig} + * + * @param subscriberConfig subscriber settings + * + * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic + */ + protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) { + final String hostName = subscriberConfig.getHostName(); + final Integer portNumber = subscriberConfig.getPortNumber(); + final String getProtocol = subscriberConfig.getProtocol(); + final String topicName = subscriberConfig.getTopicName(); + final String consumerId = subscriberConfig.getConsumerId(); + final String consumerGroup = subscriberConfig.getConsumerGroup(); + final Integer timeoutMS = subscriberConfig.getTimeoutMS(); + final Integer messageLimit = subscriberConfig.getMessageLimit(); + URI subscriberURI = null; + try { + URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) + .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + + topicName + "/" + + consumerGroup + "/" + + consumerId); + // add query params if present + if (timeoutMS > 0) { + uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString()); + } + if (messageLimit > 0) { + uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME, + messageLimit.toString()); + } + subscriberURI = uriBuilder.build(); + + } catch (URISyntaxException e) { + final String errorMessage = format("Error while creating subscriber URI: %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI); + return subscriberURI; + } + + + /** + * Creates 202 (Accepted) Response code message + * + * @param batchQueueSize batch Queue size + * + * @return response with 202 message code + */ + protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) { + return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE, + "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize); + } + + + /** + * Creates 204 (No Content) Response code message + * + * @return response with 204 message code + */ + protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() { + return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE, + "No Content - No Messages in batch queue for flushing to MR Topic", 0); + } + + + /** + * Creates Publisher Response for given response code, response Message and pending Message Count + * + * @param responseCode HTTP Status Code + * @param responseMessage response message + * @param pendingMessages pending messages in batch queue + * + * @return DMaaP MR Publisher Response + */ + protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String + responseMessage, int pendingMessages) { + return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages); + } + + + /** + * Returns weekly consistent pending messages in batch queue + * + * @param publisherQueue batch queue + * @param publisherConfig publisher settings + * + * @return pending messages to be published + */ + protected static int getPendingMessages(final @Nonnull DMaaPMRPublisherQueue publisherQueue, + final @Nonnull DMaaPMRPublisherConfig publisherConfig) { + return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize(); + } + + + /** + * Creates Subscriber Response for give response Code, response Message and fetch messages + * + * @param responseCode response Code + * @param responseMessage response Message + * @param fetchedMessages fetched messages + * + * @return DMaaP MR Subscriber Response + */ + protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String + responseMessage, List<String> fetchedMessages) { + if (fetchedMessages == null) { + return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage); + } else { + return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages); + } + } + + + /** + * Custom response handler which extract status code and response body + * + * @return Pair containing Response code and response body + */ + protected static ResponseHandler<Pair<Integer, String>> responseHandler() { + return new ResponseHandler<Pair<Integer, String>>() { + @Override + public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException { + // Get Response status code + final int status = response.getStatusLine().getStatusCode(); + final HttpEntity responseEntity = response.getEntity(); + // If response entity is not null - extract response body as string + String responseEntityString = ""; + if (responseEntity != null) { + responseEntityString = EntityUtils.toString(responseEntity); + } + return new ImmutablePair<>(status, responseEntityString); + } + }; + } + + + /** + * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will + * be lost + * + * @param publisherQueue publisher queue + * @param messages recoverable messages to be published to recovery queue + */ + protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue, + List<String> messages) { + try { + publisherQueue.addRecoverableMessages(messages); + + LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}", + messages.size(), publisherQueue.getBatchQueueRemainingSize()); + + } catch (IllegalStateException e) { + final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " + + "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d", + messages.size(), publisherQueue.getRecoveryQueueRemainingSize()); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + + /** + * Converts List of messages to Json String Array which can be published to DMaaP MR topic. + * + * @param messages messages that need to parsed to Json Array representation + * @return json string representation of message + */ + protected static String convertToJsonString(final @Nullable List<String> messages) { + // If messages are null or empty just return empty array + if (messages == null || messages.size() == 0) { + return "[]"; + } + + + List<JsonNode> jsonMessageObjectsList = new LinkedList<>(); + + try { + for (String message : messages) { + final JsonNode jsonNode = objectMapper.readTree(message); + jsonMessageObjectsList.add(jsonNode); + } + return objectMapper.writeValueAsString(jsonMessageObjectsList); + } catch (JsonProcessingException e) { + final String errorMessage = + format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s", + messages, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + + } catch (IOException e) { + final String errorMessage = + format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s", + messages, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + + /** + * Converts subscriber messages json string to List of messages. If message Json String is empty + * or null + * + * @param messagesJsonString json messages String + * + * @return List containing DMaaP MR Messages + */ + protected static List<String> convertJsonToStringMessages(final @Nullable String messagesJsonString) { + + final LinkedList<String> messages = new LinkedList<>(); + + // If message string is not null or not empty parse json message array to List of string messages + if (!(messagesJsonString == null) && !messagesJsonString.trim().isEmpty() + && !messagesJsonString.trim().equals("[]")) { + + try { + final List messageList = objectMapper.readValue(messagesJsonString, List.class); + for (Object message : messageList) { + final String jsonMessageString = objectMapper.writeValueAsString(message); + if (jsonMessageString.startsWith("\"") && jsonMessageString.endsWith("\"")) { + final String jsonSubString = jsonMessageString.substring(1, jsonMessageString.length() - 1); + messages.add(StringEscapeUtils.unescapeJson(jsonSubString)); + } else { + messages.add(StringEscapeUtils.unescapeJson(jsonMessageString)); + } + } + + } catch (IOException e) { + final String errorMessage = + format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," + + " Json Error: %s", messagesJsonString, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + } + return messages; + } + + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/DMaaPMRComponent.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/DMaaPMRComponent.java new file mode 100644 index 0000000..6f9a0ce --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/DMaaPMRComponent.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service; + +/** + * Marker interface for all DMaaP MR Components e.g. MR Publishers, MR Subscribers + * + * @author Rajiv Singla. Creation Date: 11/1/2016. + */ +public interface DMaaPMRComponent { +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisher.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisher.java new file mode 100644 index 0000000..afbdf9f --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisher.java @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; + +import java.util.Date; +import java.util.List; + +/** + * <p> + * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics. + * <p> + * + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public interface DMaaPMRPublisher extends AutoCloseable { + + + /** + * <p> + * Adds collection of messages to DMaaP MR Topic Publishing Queue. + * <p> + * Note: Invoking this method may or may not cause publishing immediately + * as publishing in done is batch mode by default. Parameter maxBatchSize + * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size. + * If the maxBatchSize is reached all message will be published automatically + * during subsequent call. + * </p> + * + * @param messages messages to publish to DMaaP MR Publisher + * @return response which may contain Http Response code 202 (Accepted) as publishing + * will proceed when max batch size is reached + * + * @throws DCAEAnalyticsRuntimeException DCAEAnalyticsRuntimeException + */ + DMaaPMRPublisherResponse publish(List<String> messages) throws DCAEAnalyticsRuntimeException; + + + /** + * <p> + * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse} + * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic. + * </p> + * + * @param messages messages to publish to DMaaP MR Publisher + * @return DMaaP Message Router Publisher Response + * + * @throws DCAEAnalyticsRuntimeException DCAEAnalyticsRuntimeException + */ + DMaaPMRPublisherResponse forcePublish(List<String> messages) throws DCAEAnalyticsRuntimeException; + + + /** + * <p> + * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns + * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to + * be flushed response code 304 (Not Modified) will be returned + * </p> + * + * @return DMaaP Message Router Publisher Response + */ + DMaaPMRPublisherResponse flush(); + + + /** + * <p> + * Returns the creation time when Publisher instance was created. + * <p> + * + * @return creation time of Subscriber instance + */ + Date getPublisherCreationTime(); + + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java new file mode 100644 index 0000000..74245f8 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import org.openecomp.dcae.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; + +/** + * <p> + * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes. + * <p> + * <strong> + * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they + * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize + * guice injected Publisher instances + * </strong> + * <p> + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +public interface DMaaPMRPublisherFactory { + + DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig); + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java new file mode 100644 index 0000000..425bead --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java @@ -0,0 +1,211 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpHeaders; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.openecomp.dcae.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.service.BaseDMaaPMRComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.List; + +import static org.openecomp.dcae.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode; +import static java.lang.String.format; + +/** + * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient} + * + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class); + + private final DMaaPMRPublisherConfig publisherConfig; + private final CloseableHttpClient closeableHttpClient; + private final DMaaPMRPublisherQueue publisherQueue; + private final Date publisherCreationTime; + private URI publisherUri; + + @Inject + public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig, + DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory, + CloseableHttpClient closeableHttpClient) { + + this.publisherConfig = publisherConfig; + this.publisherQueue = dMaaPMRPublisherQueueFactory.create( + publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize()); + this.closeableHttpClient = closeableHttpClient; + this.publisherUri = createPublisherURI(publisherConfig); + this.publisherCreationTime = new Date(); + } + + + @Override + public DMaaPMRPublisherResponse publish(List<String> messages) throws DCAEAnalyticsRuntimeException { + + final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize(); + + // if messages size is less than batch queue size - just queue them for batch publishing + if (batchQueueRemainingSize > messages.size()) { + LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}", + messages.size(), batchQueueRemainingSize); + final int batchQueueSize = publisherQueue.addBatchMessages(messages); + return createPublisherAcceptedResponse(batchQueueSize); + + } else { + + // grab all already queued messages, append current messages and force publish them to DMaaP MR topic + final List<String> queueMessages = publisherQueue.getMessageForPublishing(); + LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " + + "Publisher Topic."); + final DMaaPMRPublisherResponse forcePublishResponse = + forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages))); + return forcePublishResponse; + } + + } + + @Override + public DMaaPMRPublisherResponse forcePublish(List<String> messages) throws DCAEAnalyticsRuntimeException { + + LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size()); + + final String contentType = publisherConfig.getContentType(); + final String userName = publisherConfig.getUserName(); + final String userPassword = publisherConfig.getUserPassword(); + final HttpPost postRequest = new HttpPost(publisherUri); + + // add Authorization Header if username and password are present + final Optional<String> authHeader = getAuthHeader(userName, userPassword); + if (authHeader.isPresent()) { + postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); + } else { + LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present."); + } + + // Create post string entity + final String messagesJson = convertToJsonString(messages); + final StringEntity requestEntity = + new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8")); + postRequest.setEntity(requestEntity); + + try { + final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler()); + final Integer responseCode = responsePair.getLeft(); + final String responseBody = responsePair.getRight(); + // if messages were published successfully, return successful response + if (isSuccessfulResponseCode(responseCode)) { + LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " + + "Body: {}, Number of Messages published: {}", + responseCode, responseBody, messages.size()); + + } else { + LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " + + "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody); + addMessagesToRecoveryQueue(publisherQueue, messages); + } + + return createPublisherResponse(responseCode, responseBody, + getPendingMessages(publisherQueue, publisherConfig)); + + } catch (IOException e) { + // If IO Error then we need to also put messages in recovery queue + addMessagesToRecoveryQueue(publisherQueue, messages); + final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " + + "Messages will be queued in recovery queue. Messages Size: %d", messages.size()); + + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + } + + + @Override + public DMaaPMRPublisherResponse flush() { + final List<String> queueMessages = publisherQueue.getMessageForPublishing(); + // If there are no message return 204 (No Content) response code + if (queueMessages.size() == 0) { + LOG.debug("No messages to publish to batch queue. Returning 204 status code"); + return createPublisherNoContentResponse(); + } else { + // force publish messages in queue + return forcePublish(queueMessages); + } + } + + @Override + public Date getPublisherCreationTime() { + return new Date(publisherCreationTime.getTime()); + } + + @Override + public void close() throws Exception { + + // flush current message in the queue + int retrialNumber = 0; + int flushResponseCode; + + // automatic retries if messages cannot be flushed + do { + retrialNumber++; + DMaaPMRPublisherResponse flushResponse = flush(); + flushResponseCode = flushResponse.getResponseCode(); + + if (!isSuccessfulResponseCode(flushResponseCode)) { + LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " + + "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber, + AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE); + + Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE); + } + } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE && + !isSuccessfulResponseCode(flushResponseCode)); + + if (!isSuccessfulResponseCode(flushResponseCode)) { + LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented"); + } else { + LOG.info("Successfully published all batched messages to publisher."); + } + + // close http client + closeableHttpClient.close(); + + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java new file mode 100644 index 0000000..cb3c113 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import java.util.List; + +/** + * <p> + * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic + * is offline for some reason. It does so by having a recovery queue which keeps + * messages in order in case there is temporary interruption in DMaaP Publisher + * </p> + * + * @author Rajiv Singla. Creation Date: 11/1/2016. + */ +public interface DMaaPMRPublisherQueue { + + /** + * <p> + * Add batchMessages to Batch Queue + * </p> + * + * @param batchMessages messages that needs to be added to batch queue + * @return current size of batch queue + * + * @throws IllegalStateException if batch queue does not have enough space + */ + int addBatchMessages(List<String> batchMessages) throws IllegalStateException; + + + /** + * <p> + * Add recoverable messages to Recoverable Queue + * </p> + * + * @param recoverableMessages messages that needs to be added to recoverable queue + * @return current size of the recoverable queue + * + * @throws IllegalStateException if recoverable queue does not have enough space + */ + int addRecoverableMessages(List<String> recoverableMessages) throws IllegalStateException; + + /** + * <p> + * Get messages that need to be published to DMaaP topic. Messages in recoverable + * queue are appended if present. + * </p> + * + * @return List of messages from both batch and recovery queue + */ + List<String> getMessageForPublishing(); + + /** + * <p> + * Remaining capacity of Batch Queue + * </p> + * + * @return Remaining Batch Queue Size + */ + int getBatchQueueRemainingSize(); + + /** + * <p> + * Remaining capacity of Recovery Queue + * </p> + * + * @return Remaining Recovery Queue Size + */ + int getRecoveryQueueRemainingSize(); + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java new file mode 100644 index 0000000..bfc3dab --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import com.google.inject.assistedinject.Assisted; + +/** + * <p> + * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes. + * <p> + * + * @author Rajiv Singla. Creation Date: 11/1/2016. + */ +public interface DMaaPMRPublisherQueueFactory { + + DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize, + @Assisted("recoveryQueueSize") int recoveryQueueSize); + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java new file mode 100644 index 0000000..0367950 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java @@ -0,0 +1,124 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; + +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Lists.newLinkedList; +import static java.util.Collections.unmodifiableList; + +/** + * <p> + * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque} + * for batch and recovery queues + * </p> + * + * + * @author Rajiv Singla. Creation Date: 11/1/2016. + */ +public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class); + + private final LinkedBlockingDeque<String> batchQueue; + private final LinkedBlockingDeque<String> recoveryQueue; + + @Inject + public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize, + @Assisted("recoveryQueueSize") int recoveryQueueSize) { + batchQueue = new LinkedBlockingDeque<>(batchQueueSize); + recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize); + LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}", + batchQueueSize, recoveryQueueSize); + } + + @Override + public synchronized int addBatchMessages(List<String> batchMessages) throws IllegalStateException { + + // checks if batchMessages size does not exceed batch queue capacity + if (batchMessages.size() > batchQueue.remainingCapacity()) { + throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue"); + } + + // Add batchMessages to batch queue + for (String message : batchMessages) { + batchQueue.add(message); + } + + // returns current elements size in batch queue + return batchQueue.size(); + } + + @Override + public synchronized int addRecoverableMessages(List<String> recoverableMessages) throws IllegalStateException { + + // checks if messages size does not exceed recovery queue size + if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) { + throw new IllegalStateException("Not enough capacity to add messages in recovery queue"); + } + + // add messages to recovery queue + for (String recoverableMessage : recoverableMessages) { + recoveryQueue.add(recoverableMessage); + } + + // returns current size of recovery queue + return recoveryQueue.size(); + } + + @Override + public synchronized List<String> getMessageForPublishing() { + + final List<String> recoveryMessageList = new LinkedList<>(); + final List<String> batchMessagesList = new LinkedList<>(); + + // get messages from recovery queue if present + if (recoveryQueue.size() > 0) { + recoveryQueue.drainTo(recoveryMessageList); + } + + // get messages from batch queue if present + if (batchQueue.size() > 0) { + batchQueue.drainTo(batchMessagesList); + } + + // concat recovery and batch queue elements + return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList))); + } + + @Override + public synchronized int getBatchQueueRemainingSize() { + return batchQueue.remainingCapacity(); + } + + @Override + public synchronized int getRecoveryQueueRemainingSize() { + return recoveryQueue.remainingCapacity(); + } +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java new file mode 100644 index 0000000..096c0a9 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.subscriber; + +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; + +import java.util.Date; + +/** + * <p> + * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics. + * <p> + * + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public interface DMaaPMRSubscriber extends AutoCloseable { + + /** + * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters + * for messageLimit and message timeout are used + * + * @return DMaaP Message Router Subscriber Response + * + * @throws DCAEAnalyticsRuntimeException DCAE Analytics Exception + */ + DMaaPMRSubscriberResponse fetchMessages() throws DCAEAnalyticsRuntimeException; + + + /** + * Returns the Subscriber instance creation time + * <p> + * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which + * are published to the topic after the creation of the Subscriber. + * + * @return creation time of Subscriber instance + */ + Date getSubscriberCreationTime(); + + +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java new file mode 100644 index 0000000..33c405e --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.subscriber; + +import org.openecomp.dcae.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; + +/** + * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes. + * <p> + * <strong> + * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they + * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize + * guice injected Subscriber instances + * </strong> + * <p> + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +public interface DMaaPMRSubscriberFactory { + + DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig); +} diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java new file mode 100644 index 0000000..6d98189 --- /dev/null +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java @@ -0,0 +1,129 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.subscriber; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpHeaders; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.service.BaseDMaaPMRComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import static org.openecomp.dcae.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode; +import static java.lang.String.format; + +/** + * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient} + * + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class); + + private final DMaaPMRSubscriberConfig subscriberConfig; + private final CloseableHttpClient closeableHttpClient; + private final URI subscriberUri; + private final Date subscriberCreationTime; + + @Inject + public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig, + CloseableHttpClient closeableHttpClient) { + this.subscriberConfig = subscriberConfig; + this.closeableHttpClient = closeableHttpClient; + this.subscriberUri = createSubscriberURI(subscriberConfig); + this.subscriberCreationTime = new Date(); + } + + @Override + public DMaaPMRSubscriberResponse fetchMessages() throws DCAEAnalyticsRuntimeException { + + final String userName = subscriberConfig.getUserName(); + final String userPassword = subscriberConfig.getUserPassword(); + + final HttpGet getRequest = new HttpGet(subscriberUri); + + // add Authorization Header if username and password are present + final Optional<String> authHeader = getAuthHeader(userName, userPassword); + if (authHeader.isPresent()) { + getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); + } else { + LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present."); + } + + try { + + final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler()); + final Integer responseCode = responsePair.getLeft(); + final String responseBody = responsePair.getRight(); + + List<String> fetchedMessages = new LinkedList<>(); + String responseMessage = responseBody; + + // if messages were published successfully, return successful response + if (isSuccessfulResponseCode(responseCode)) { + if (responseBody != null) { + fetchedMessages = convertJsonToStringMessages(responseBody); + responseMessage = "Messages Fetched Successfully"; + } else { + responseMessage = "DMaaP Response Body had no messages"; + } + } else { + LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " + + "DMaaP Response Body: {}", responseCode, responseBody); + } + + return createSubscriberResponse(responseCode, responseMessage, fetchedMessages); + + } catch (IOException e) { + + final String errorMessage = + format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + + } + + @Override + public Date getSubscriberCreationTime() { + return new Date(subscriberCreationTime.getTime()); + } + + @Override + public void close() throws Exception { + closeableHttpClient.close(); + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPGuiceUnitTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPGuiceUnitTest.java new file mode 100644 index 0000000..522b057 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPGuiceUnitTest.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap; + +import org.junit.runner.RunWith; +import org.openecomp.dcae.analytics.test.runner.GuiceJUnitRunner; + +/** + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +@RunWith(GuiceJUnitRunner.class) +public abstract class BaseAnalyticsDMaaPGuiceUnitTest extends BaseAnalyticsDMaaPUnitTest { +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPUnitTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPUnitTest.java new file mode 100644 index 0000000..75e3ec4 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/BaseAnalyticsDMaaPUnitTest.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap; + +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.test.BaseDCAEAnalyticsUnitTest; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.of; + +/** + * @author Rajiv Singla. Creation Date: 10/14/2016. + */ +public abstract class BaseAnalyticsDMaaPUnitTest extends BaseDCAEAnalyticsUnitTest { + + // Unit Test Settings + protected static final String HOST_NAME = "testHostName"; + protected static final Integer PORT_NUMBER = 8080; + protected static final String TOPIC_NAME = "testTopicName"; + protected static final String USERNAME = "testUserName"; + protected static final String PASSWORD = "testPassword"; + protected static final String HTTP_PROTOCOL = "https"; + protected static final String CONTENT_TYPE = "application/json"; + + protected static final int PUBLISHER_MAX_BATCH_QUEUE_SIZE = 200; + protected static final int PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = 2000; + + protected static final String SUBSCRIBER_CONSUMER_ID = "123"; + protected static final String SUBSCRIBER_CONSUMER_GROUP_NAME = "testGonsumerName-" + SUBSCRIBER_CONSUMER_ID; + protected static final int SUBSCRIBER_TIMEOUT_MS = 2000; + protected static final int SUBSCRIBER_MESSAGE_LIMIT = 20; + + /** + * Creates Sample Publisher settings for unit testing purposes + * + * @return sample publisher settings for testing + */ + protected static DMaaPMRPublisherConfig getPublisherConfig() { + return new DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setContentType(CONTENT_TYPE) + .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build(); + } + + /** + * Creates Sample Subscriber settings for unit testing purposes + * + * @return sample subscriber settings for testing + */ + protected static DMaaPMRSubscriberConfig getSubscriberConfig(String consumerId, String consumerGroup) { + return new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setConsumerGroup(consumerGroup != null ? consumerGroup : SUBSCRIBER_CONSUMER_GROUP_NAME) + .setConsumerId(consumerId != null ? consumerId : SUBSCRIBER_CONSUMER_ID) + .setTimeoutMS(SUBSCRIBER_TIMEOUT_MS) + .setMessageLimit(SUBSCRIBER_MESSAGE_LIMIT).build(); + } + + /** + * Creates two sample message for publishing + * + * @return sample publish message list + */ + protected static List<String> getTwoSampleMessages() { + String message1 = "{ \"message\" : \"Test Message1\"}"; + String message2 = "{ \"message\" : \"Test Message2\"}"; + return of(message1, message2); + } + + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactoryTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactoryTest.java new file mode 100644 index 0000000..646b48d --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/DMaaPMRFactoryTest.java @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap; + +import org.junit.Before; +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.module.AnalyticsDMaaPTestModule; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.analytics.test.annotation.GuiceModules; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Manjesh Gowda. Creation Date: 11/7/2016. + */ +@GuiceModules(AnalyticsDMaaPTestModule.class) +public class DMaaPMRFactoryTest extends BaseAnalyticsDMaaPGuiceUnitTest { + + private DMaaPMRFactory dmaapMRFactory; + + @Before + public void setUp() throws Exception { + dmaapMRFactory = new DMaaPMRFactory(new AnalyticsDMaaPTestModule()); + } + + @Test + public void createPublisher() throws Exception { + DMaaPMRPublisher publisher = dmaapMRFactory.createPublisher(getPublisherConfig()); + DMaaPMRPublisherResponse response = publisher.publish(null); + assertThat(response.getResponseCode(), is(102)); + } + + @Test + public void createSubscriber() throws Exception { + DMaaPMRSubscriber dmaapMRSubscriber = dmaapMRFactory.createSubscriber(getSubscriberConfig("", "")); + DMaaPMRSubscriberResponse response = dmaapMRSubscriber.fetchMessages(); + assertThat(response.getResponseCode(), is(102)); + } + + @Test + public void create() throws Exception { + + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfigTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfigTest.java new file mode 100644 index 0000000..a66d3f2 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRPublisherConfigTest.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; + +import static org.junit.Assert.assertTrue; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_CONTENT_TYPE; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_PORT_NUMBER; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_PROTOCOL; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_USER_NAME; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_USER_PASSWORD; + +/** + * @author Rajiv Singla. Creation Date: 10/14/2016. + */ +public class DMaaPMRPublisherConfigTest extends BaseAnalyticsDMaaPUnitTest { + + + @Test + public void testPublisherConfigDefaults() throws Exception { + + final DMaaPMRPublisherConfig actualDefaultPublisherConfig = + new DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME).build(); + + final DMaaPMRPublisherConfig expectedDefaultPublisherConfig = + new DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(DEFAULT_PORT_NUMBER) + .setUserName(DEFAULT_USER_NAME) + .setUserPassword(DEFAULT_USER_PASSWORD) + .setContentType(DEFAULT_CONTENT_TYPE) + .setProtocol(DEFAULT_PROTOCOL) + .setMaxBatchSize(DEFAULT_PUBLISHER_MAX_BATCH_SIZE) + .setMaxRecoveryQueueSize(DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .build(); + + assertTrue("Default Publisher Config parameters must match", + actualDefaultPublisherConfig.equals(expectedDefaultPublisherConfig)); + + } + + + @Test + public void testPublisherCustomConfig() throws Exception { + + + final DMaaPMRPublisherConfig actualCustomPublisherConfig = getPublisherConfig(); + + final DMaaPMRPublisherConfig expectedCustomPublisherConfig = + new DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setContentType(CONTENT_TYPE) + .setProtocol(HTTP_PROTOCOL) + .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE) + .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .build(); + + assertTrue("Custom Publisher Config parameters must match", + actualCustomPublisherConfig.equals(expectedCustomPublisherConfig)); + } + + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfigTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfigTest.java new file mode 100644 index 0000000..10edc0c --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/domain/config/DMaaPMRSubscriberConfigTest.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.domain.config; + +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; + +import static org.junit.Assert.assertTrue; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_GROUP_PREFIX; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_MESSAGE_LIMIT; +import static org.openecomp.dcae.analytics.common.AnalyticsConstants.DEFAULT_SUBSCRIBER_TIMEOUT_MS; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_CONTENT_TYPE; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_PORT_NUMBER; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_PROTOCOL; +import static org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRConfig.DEFAULT_USER_NAME; + +/** + * @author Rajiv Singla. Creation Date: 10/14/2016. + */ +public class DMaaPMRSubscriberConfigTest extends BaseAnalyticsDMaaPUnitTest { + + @Test + public void testSubscriberConfigDefaults() throws Exception { + + DMaaPMRSubscriberConfig actualDefaultSubscriberConfig = + new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setConsumerGroup(DEFAULT_SUBSCRIBER_GROUP_PREFIX + SUBSCRIBER_CONSUMER_ID) + .setConsumerId(SUBSCRIBER_CONSUMER_ID).build(); + + DMaaPMRSubscriberConfig expectedSubscriberConfig = + new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(DEFAULT_PORT_NUMBER) + .setUserName(DEFAULT_USER_NAME) + .setContentType(DEFAULT_CONTENT_TYPE) + .setProtocol(DEFAULT_PROTOCOL) + .setConsumerGroup(DEFAULT_SUBSCRIBER_GROUP_PREFIX + SUBSCRIBER_CONSUMER_ID) + .setConsumerId(SUBSCRIBER_CONSUMER_ID) + .setMessageLimit(DEFAULT_SUBSCRIBER_MESSAGE_LIMIT) + .setTimeoutMS(DEFAULT_SUBSCRIBER_TIMEOUT_MS) + .build(); + + assertTrue("Default Subscriber Config parameters must match", + actualDefaultSubscriberConfig.equals(expectedSubscriberConfig)); + + } + + + @Test + public void testSubscriberCustomConfig() throws Exception { + + DMaaPMRSubscriberConfig actualSubscriberCustomConfig = getSubscriberConfig(SUBSCRIBER_CONSUMER_ID, + SUBSCRIBER_CONSUMER_GROUP_NAME); + + DMaaPMRSubscriberConfig expectedSubscriberCustomConfig = + new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setContentType(CONTENT_TYPE) + .setProtocol(HTTP_PROTOCOL) + .setConsumerGroup(SUBSCRIBER_CONSUMER_GROUP_NAME) + .setConsumerId(SUBSCRIBER_CONSUMER_ID) + .setMessageLimit(SUBSCRIBER_MESSAGE_LIMIT) + .setTimeoutMS(SUBSCRIBER_TIMEOUT_MS) + .build(); + + assertTrue("Custom Subscriber Config parameters must match", + actualSubscriberCustomConfig.equals(expectedSubscriberCustomConfig)); + + } + + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/BaseAnalyticsDMaaPIT.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/BaseAnalyticsDMaaPIT.java new file mode 100644 index 0000000..987b57b --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/BaseAnalyticsDMaaPIT.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.it; + +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.test.BaseDCAEAnalyticsIT; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.of; + +/** + * Base class for all DCAE DMaaP Integration Tests + * <p> + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public abstract class BaseAnalyticsDMaaPIT extends BaseDCAEAnalyticsIT { + + // Integration Test Settings + protected static final String HOST_NAME = "mrlocal-mtnjftle01.homer.com"; + protected static final Integer PORT_NUMBER = 3905; + protected static final String TOPIC_NAME = "com.dcae.dmaap.mtnje2.DcaeTestVESPub"; + + protected static final String USERNAME = "m00502@tca.af.dcae.com"; + protected static final String PASSWORD = "Te5021abc"; + protected static final String HTTP_PROTOCOL = "https"; + protected static final String CONTENT_TYPE = "application/json"; + + protected static final int PUBLISHER_MAX_BATCH_QUEUE_SIZE = 20; + protected static final int PUBLISHER_MAX_RECOVERY_QUEUE_SIZE = 200; + + protected static final String SUBSCRIBER_CONSUMER_ID = "123"; + protected static final String SUBSCRIBER_CONSUMER_GROUP_NAME = "testGonsumerName-" + SUBSCRIBER_CONSUMER_ID; + protected static final int SUBSCRIBER_TIMEOUT_MS = 2000; + protected static final int SUBSCRIBER_MESSAGE_LIMIT = 20; + + /** + * Creates Sample Publisher settings for integration testing purposes + * + * @return + */ + protected static DMaaPMRPublisherConfig getPublisherConfig() { + return new DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE) + .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .build(); + } + + /** + * Creates Sample Subscriber settings for integration testing purposes + * + * @return + */ + protected static DMaaPMRSubscriberConfig getSubscriberConfig(String consumerId) { + return new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setUserName(USERNAME) + .setUserPassword(PASSWORD) + .setConsumerGroup(SUBSCRIBER_CONSUMER_GROUP_NAME) + .setConsumerId(consumerId != null ? consumerId : SUBSCRIBER_CONSUMER_ID) + .setTimeoutMS(SUBSCRIBER_TIMEOUT_MS) + .setMessageLimit(SUBSCRIBER_MESSAGE_LIMIT).build(); + } + + /** + * Publishes 2 sample message to DMaaP Topic for integration test purposes + * + * @param dMaaPMRPublisher + * @return + */ + protected static DMaaPMRPublisherResponse publishTwoSampleMessages(DMaaPMRPublisher dMaaPMRPublisher) { + DMaaPMRPublisherResponse dMaaPMRPublisherResponse = dMaaPMRPublisher.publish(getTwoSampleMessage()); + return dMaaPMRPublisherResponse; + } + + protected static List<String> getTwoSampleMessage() { + String message1 = "{ \"message\" : \"Test Message1\"}"; + String message2 = "{ \"message\" : \"Test Message2\"}"; + return of(message1, message2); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRPublisherImplIT.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRPublisherImplIT.java new file mode 100644 index 0000000..a7e1f8c --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRPublisherImplIT.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.it; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; + +/** + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRPublisherImplIT extends BaseAnalyticsDMaaPIT { + + private DMaaPMRPublisher dMaaPMRPublisher; + + @Before + public void before() throws Exception { + DMaaPMRFactory dMaaPMRFactory = DMaaPMRFactory.create(); + dMaaPMRPublisher = dMaaPMRFactory.createPublisher(getPublisherConfig()); + } + + @Test + public void testPublish() throws Exception { + long pendingMessageCount = publishTwoSampleMessages(dMaaPMRPublisher).getPendingMessagesCount(); + Assert.assertTrue("Published Message Count must be 2", pendingMessageCount == 2); + } + + @Test + public void testFlush() throws Exception { + publishTwoSampleMessages(dMaaPMRPublisher); + DMaaPMRPublisherResponse publisherResponse = dMaaPMRPublisher.flush(); + Integer responseCode = publisherResponse.getResponseCode(); + Assert.assertTrue("Server Response code must be 200", responseCode == 200); + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRSubscriberImplIT.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRSubscriberImplIT.java new file mode 100644 index 0000000..4b32e96 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/it/DMaaPMRSubscriberImplIT.java @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.it; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; + +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla. Creation Date: 10/13/2016. + */ +public class DMaaPMRSubscriberImplIT extends BaseAnalyticsDMaaPIT { + + private DMaaPMRPublisher dMaaPMRPublisher; + private DMaaPMRSubscriber dMaaPMRSubscriber; + + @Before + public void before() throws Exception { + String randomConsumerID = UUID.randomUUID().toString(); + DMaaPMRFactory dMaaPMRFactory = DMaaPMRFactory.create(); + dMaaPMRSubscriber = dMaaPMRFactory.createSubscriber(getSubscriberConfig(randomConsumerID)); + dMaaPMRPublisher = dMaaPMRFactory.createPublisher(getPublisherConfig()); + } + + @After + public void after() throws Exception { + dMaaPMRSubscriber.close(); + dMaaPMRPublisher.close(); + } + + + @Test + public void testFetchMessages() throws Exception { + + // This call is used to just register a brand new subscriber with DMaaP + DMaaPMRSubscriberResponse subscriberRegistrationResponse = dMaaPMRSubscriber.fetchMessages(); + assertTrue("Subscriber Registration Response code must be 200 confirming subscriber was registered " + + "successfully", subscriberRegistrationResponse.getResponseCode() == 200); + assertTrue("Subscriber Registration Response must not contain any messages", subscriberRegistrationResponse + .getFetchedMessages().size() == 0); + + // Force push couple of test messages + DMaaPMRPublisherResponse publisherResponse = dMaaPMRPublisher.forcePublish(getTwoSampleMessage()); + assertTrue("Message must be posted successfully before subscriber can fetch it", publisherResponse + .getResponseCode() == 200); + + // Now fetch messages from DMaaP + DMaaPMRSubscriberResponse subscriberResponse = dMaaPMRSubscriber.fetchMessages(); + List<String> messageList = new LinkedList<>(); + for (String message : subscriberResponse.getFetchedMessages()) { + messageList.add(message); + } + assertTrue("Subscriber message count must be 2", messageList.size() == 2); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPTestModule.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPTestModule.java new file mode 100644 index 0000000..7ef44aa --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/module/AnalyticsDMaaPTestModule.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.module; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherFactory; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherMockImpl; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueueFactory; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueueImpl; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriberFactory; +import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriberMockImpl; + +/** + * DMaaP Guice Test Module + * <p> + * @author Rajiv Singla. Creation Date: 10/20/2016. + */ +public class AnalyticsDMaaPTestModule extends AbstractModule { + + + @Override + protected void configure() { +// Bind Http Client + bind(CloseableHttpClient.class).toInstance(HttpClients.createDefault()); + + // Bind Publishing queue + install(new FactoryModuleBuilder().implement(DMaaPMRPublisherQueue.class, DMaaPMRPublisherQueueImpl.class) + .build(DMaaPMRPublisherQueueFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRPublisher.class, DMaaPMRPublisherMockImpl.class) + .build(DMaaPMRPublisherFactory.class)); + + install(new FactoryModuleBuilder().implement(DMaaPMRSubscriber.class, DMaaPMRSubscriberMockImpl.class) + .build(DMaaPMRSubscriberFactory.class)); + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponentTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponentTest.java new file mode 100644 index 0000000..4cc3f1e --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/BaseDMaaPMRComponentTest.java @@ -0,0 +1,340 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Optional; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.ResponseHandler; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +/** + * @author Manjesh Gowda. Creation Date: 11/4/2016. + */ +public class BaseDMaaPMRComponentTest extends BaseAnalyticsDMaaPUnitTest { + + //region Get Authorization Header Tests + @Test + public void testGetAuthHeaderWithGoodValues() { + String expectedEncodedString = "Basic bTAwNTAyQHRjYS5hZi5kY2FlLmNvbTpUZTUwMjFhYmM="; + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("m00502@tca.af.dcae.com", "Te5021abc"); + assertTrue(" Authentication Header has value ", actualOutput.isPresent()); + assertEquals(" Authentication Header has value ", expectedEncodedString, actualOutput.get()); + } + + @Test + public void testGetAuthHeaderWithNullValues() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader(null, null); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithUserNullValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("m00502@tca.af.dcae.com", null); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithPasswordNullValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader(null, "Te5021abc"); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + /* @Test + public void testGetAuthHeaderWithEmptyValues() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("", ""); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithUserEmptyValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("m00502@tca.af.dcae.com", ""); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithPasswordEmptyValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("", "Te5021abc"); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + }*/ + //endregion + + //region Publisher URI Tests + @Test + public void testCreatePublishURIWithGoodValues() { + URI actualURI = BaseDMaaPMRComponent.createPublisherURI(getPublisherConfig()); + String test = actualURI.toString(); + assertEquals("Generated Publisher URL is correct", + "https://testHostName:8080/events/testTopicName", actualURI.toString()); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testCreatePublishURIWithURISyntaxException() { + DMaaPMRPublisherConfig badPublisherConfig = new DMaaPMRPublisherConfig + .Builder(" dav /gh. ss/ asd ", "///@$%#-htps:<>!##") + .setPortNumber(0) + .setProtocol("https").build(); + + URI actualURI = BaseDMaaPMRComponent.createPublisherURI(badPublisherConfig); + } + + //endregion + + //region Subscriber URI Tests + @Test + public void testCreateSubscribeURIWithGoodValues() { + URI actualURI = BaseDMaaPMRComponent.createSubscriberURI( + getSubscriberConfig("test-consumer-group", "test-consumer-id")); + assertEquals("Generated Subscriber URL is correct", + "https://testHostName:8080/events/testTopicName/" + + "test-consumer-id/test-consumer-group?timeout=2000&limit=20", + actualURI.toString()); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testCreateSubscribeURIWithURISyntaxException() { + DMaaPMRSubscriberConfig badSubscriberConfig = new DMaaPMRSubscriberConfig + .Builder(" dav /gh. ss/ asd ", "") + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE).build(); + + URI actualURI = BaseDMaaPMRComponent.createSubscriberURI(badSubscriberConfig); + } + + //endregion + + //region Convert toJSON String tests + @Test + public void testConvertToJsonStringGoodJsonStringList() { + List<String> jsonMessage = Arrays.asList( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}"); + + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + + String expectedJSONMsg = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + + } + + @Rule + public ExpectedException expectedJsonProcessingException = ExpectedException.none(); + + @Test + public void testConvertToJsonStringBadJsonStringList() { + expectedJsonProcessingException.expect(DCAEAnalyticsRuntimeException.class); + expectedJsonProcessingException.expectCause(isA(JsonProcessingException.class)); + + List<String> jsonMessage = Arrays.asList( + "{\"message\":\"I'm Object 1 Message\"", + "\"message\":\"I'm Object 2 Message\""); + + BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + } + + @Test + public void testConvertToJsonStringWithEmptyList() { + List<String> jsonMessage = Arrays.asList(); + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + String expectedJSONMsg = "[]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + } + + @Test + public void testConvertToJsonStringWithNullList() { + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(null); + String expectedJSONMsg = "[]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + } + + //endregion + + //region Convert JSONtoString String tests + + @Test + public void testConvertJsonToStringMessagesGoodValues() { + String inputJSONMsg = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(2)); + assertThat(actualList, containsInAnyOrder( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}" + )); + } + + @Test + public void testConvertJsonToStringMessagesNoValues() { + String inputJSONMsg = "[]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(0)); + } + + @Test + public void testConvertJsonToStringMessagesNullValues() { + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(null); + assertThat(actualList, hasSize(0)); + } + + @Test + public void testConvertJsonToStringMessagesEmptyValues() { + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(" "); + assertThat(actualList, hasSize(0)); + } + + @Rule + public ExpectedException convertToJSONIOException = ExpectedException.none(); + + @Test + public void testConvertJsonToStringMessagesException() { + convertToJSONIOException.expect(DCAEAnalyticsRuntimeException.class); + convertToJSONIOException.expectCause(isA(IOException.class)); + + String inputJSONMsg = "[\"{\"message\":\"I'm Object 1 Message\"}\"," + + "\"{\"message\":\"I'm Object 2 Message\"}\"]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(2)); + assertThat(actualList, containsInAnyOrder( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}" + )); + } + + //endregion + + //region Test add message to recovery queue + @Test + public void testAddMessagesToRecoveryQueueAllGood() { + DMaaPMRPublisherQueue dmaapMRPublisherQueue = mock(DMaaPMRPublisherQueue.class); + given(dmaapMRPublisherQueue.addRecoverableMessages(Mockito.<String>anyList())).willReturn(0); + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0); + List<String> messages = new ArrayList<String>(); + BaseDMaaPMRComponent.addMessagesToRecoveryQueue(dmaapMRPublisherQueue, messages); + } + + @Rule + public ExpectedException addQueueIllegalException = ExpectedException.none(); + + @Test + public void testAddMessagesToRecoveryQueueException() { + addQueueIllegalException.expect(isA(DCAEAnalyticsRuntimeException.class)); + addQueueIllegalException.expectCause(isA(IllegalStateException.class)); + + DMaaPMRPublisherQueue dmaapMRPublisherQueue = mock(DMaaPMRPublisherQueue.class); + + given(dmaapMRPublisherQueue.addRecoverableMessages(Mockito.<String>anyList())) + .willThrow(IllegalStateException.class); + List<String> messages = new ArrayList<String>(); + + BaseDMaaPMRComponent.addMessagesToRecoveryQueue(dmaapMRPublisherQueue, messages); + } + + //endregion + + //region Miscellaneous tests + + @Test + public void testResponseHandler() { + HttpResponse mockHttpResponse = mock(HttpResponse.class); + StatusLine mockStatusLine = mock(StatusLine.class); + HttpEntity mockHttpEntity = mock(HttpEntity.class); + // Could not mock EntityUtils as it's final class + //EntityUtils mockEntityUtils = mock(EntityUtils.class); + + given(mockHttpResponse.getStatusLine()).willReturn(mockStatusLine); + given(mockStatusLine.getStatusCode()).willReturn(200); + given(mockHttpResponse.getEntity()).willReturn(null); + //given(mockEntityUtils.toString()).willReturn("Test value"); + + ResponseHandler<Pair<Integer, String>> responseHandler = BaseDMaaPMRComponent.responseHandler(); + try { + Pair<Integer, String> mappedResponse = responseHandler.handleResponse(mockHttpResponse); + assertTrue("Http response code returned properly ", mappedResponse.getLeft().equals(200)); + assertTrue("Http response body returned properly ", mappedResponse.getRight().equals("")); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + //endregion + + //region createSubscriberResponse tests + @Test + public void testCreateSubscriberResponse() { + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = + BaseDMaaPMRComponent.createSubscriberResponse(200, "Test Message", getTwoSampleMessages()); + + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertEquals(dmaapMRSubscriberResponse.getResponseMessage(), "Test Message"); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages().size(), is(2)); + + } + + @Test + public void testCreateSubscriberResponse_no_message() { + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = + BaseDMaaPMRComponent.createSubscriberResponse(200, "Test Message", null); + + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertEquals(dmaapMRSubscriberResponse.getResponseMessage(), "Test Message"); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages().size(), is(0)); + + } + + //endregion +} + + + + + diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java new file mode 100644 index 0000000..a24bb39 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java @@ -0,0 +1,212 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * @author Rajiv Singla. Creation Date: 10/21/2016. + */ +@RunWith(MockitoJUnitRunner.class) +public class DMaaPMRPublisherImplTest extends BaseAnalyticsDMaaPUnitTest { + + @Mock + DMaaPMRPublisherQueueFactory dmaapMRPublisherQueueFactory; + @Mock + CloseableHttpClient closeableHttpClient; + @Mock + DMaaPMRPublisherQueue dmaapMRPublisherQueue; + + @Before + public void setUp() throws Exception { + given(dmaapMRPublisherQueueFactory.create(Mockito.anyInt(), Mockito.anyInt())) + .willReturn(dmaapMRPublisherQueue); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testPublishSmallMessageList() throws Exception { + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(10); + given(dmaapMRPublisherQueue.addBatchMessages(Mockito.<String>anyList())).willReturn(2); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + + DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages()); + + assertThat(dmaapMRPublisherResponse.getResponseCode(), is(202)); + assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(2)); + assertThat(dmaapMRPublisherResponse.getResponseMessage(), + is("Accepted - Messages queued for batch publishing to MR Topic")); + } + + @Test + public void testPublishBigMessageList() throws Exception { + + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0); + given(dmaapMRPublisherQueue.getMessageForPublishing()).willReturn(getTwoSampleMessages()); + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + + DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages()); + + assertThat(dmaapMRPublisherResponse.getResponseCode(), is(200)); + assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(200)); + assertThat(dmaapMRPublisherResponse.getResponseMessage(), is("Message successfully posted")); + } + + @Test + public void testForcePublishSuccessful() throws Exception { + DMaaPMRPublisherConfig dmaapMRPublisherConfig = new + DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build(); + + HttpPost httpPost = Mockito.mock(HttpPost.class); + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + dmaapMRPublisherConfig, dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + assertThat(response.getResponseCode(), is(200)); + } + + @Test + public void testForcePublishFailure() throws Exception { + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(503, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + assertThat(response.getResponseCode(), is(503)); + } + + @Rule + public ExpectedException httpIOException = ExpectedException.none(); + + @Test + public void testForcePublishHttpFailure() throws Exception { + + httpIOException.expect(DCAEAnalyticsRuntimeException.class); + httpIOException.expectCause(isA(IOException.class)); + + given(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + } + + @Test + public void testFlushSuccessful() throws Exception { + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush(); + assertThat(response.getResponseCode(), is(200)); + } + + @Test + public void testFlushEmptyList() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush(); + assertThat(response.getResponseCode(), is(204)); + } + + @Test + public void testClose() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList()); + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + dmaapMRPublisherImpl.close(); + verify(closeableHttpClient).execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)); + } + + @Test + public void testCloseUnsuccessful() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList()); + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(400, "Message successfully posted")); + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + dmaapMRPublisherImpl.close(); + verify(closeableHttpClient, times(6)).execute(Mockito.any(HttpUriRequest.class), + Mockito.any(ResponseHandler.class)); + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java new file mode 100644 index 0000000..333b816 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl; + +import java.util.Date; +import java.util.List; + +/** + * @author Rajiv Singla. Creation Date: 10/21/2016. + */ +public class DMaaPMRPublisherMockImpl implements DMaaPMRPublisher { + + @Override + public DMaaPMRPublisherResponse publish(List<String> messages) throws DCAEAnalyticsRuntimeException { + return new DMaaPMRPublisherResponseImpl(102, "Mock Response", 100); + } + + @Override + public DMaaPMRPublisherResponse forcePublish(List<String> messages) throws DCAEAnalyticsRuntimeException { + return null; + } + + @Override + public DMaaPMRPublisherResponse flush() { + return null; + } + + @Override + public Date getPublisherCreationTime() { + return null; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java new file mode 100644 index 0000000..07622df --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java @@ -0,0 +1,189 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import org.junit.Test; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; + +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * + * @author Rajiv Singla. Creation Date: 11/2/2016. + */ +public class DMaaPMRPublisherQueueImplTest extends BaseAnalyticsDMaaPUnitTest { + + + @Test + public void testAddBatchMessages() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two more message to batch queue + final int batchMessagesSizeAfterSecondInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 4", batchMessagesSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 4 messages to publish", messagesToPublish.size() == 4); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + + } + + @Test(expected = IllegalStateException.class) + public void testAddBatchMessagesWhenQueueSizeIsFull() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(2, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add 2 more messages should now throw IllegalStateException + publisherQueue.addBatchMessages(getTwoSampleMessages()); + } + + @Test + public void testAddRecoverableMessages() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages + final int recoverableMessageSizeAfterSecondInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 4 after second insert", + recoverableMessageSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20); + } + + + @Test(expected = IllegalStateException.class) + public void testAddRecoverableMessagesWhenRecoveryQueueIsFull() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 2); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages which should throw IllegalStateException + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + } + + @Test + public void testGetMessageForPublishing() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages + final int recoverableMessageSizeAfterSecondInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 4 after second insert", + recoverableMessageSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6); + // add two more batch and recovery messages + final int batchQueueSize = publisherQueue.addBatchMessages(getTwoSampleMessages()); + final int recoveryQueueSize = publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + final int messagePublishCount = publisherQueue.getMessageForPublishing().size(); + assertTrue("Batch Queue + Recovery Queue message total must batch publish message count", + messagePublishCount == (batchQueueSize + recoveryQueueSize)); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20); + + } + + @Test + public void testGetBatchQueueRemainingSize() throws Exception { + + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + assertTrue("Batch remaining capacity should be reduced by 2", + publisherQueue.getBatchQueueRemainingSize() == 8); + + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + + // recoverable message should not change batch queue capacity + assertTrue("Adding recoverable Message must not have any impact on batch queue remaining capacity ", + publisherQueue.getBatchQueueRemainingSize() == 8); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4); + + // Batch queue remaining capacity should now match original batch size + assertTrue("Batch Queue remaining capacity must match original batch queue size", publisherQueue + .getBatchQueueRemainingSize() == 10); + } + + @Test + public void testGetRecoveryQueueRemainingSize() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + assertTrue("Recovery Queue remaining capacity should be reduced by 2", + publisherQueue.getRecoveryQueueRemainingSize() == 18); + + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + + // batch message should not change recoverable queue capacity + assertTrue("Adding batch queue Message must not have any impact on recovery queue remaining capacity ", + publisherQueue.getRecoveryQueueRemainingSize() == 18); + + // Now get all messages which must drain out recovery queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4); + + // Recoverable queue remaining capacity should now match original recovery queue size + assertTrue("Recoverable Queue remaining capacity must match original batch queue size", publisherQueue + .getRecoveryQueueRemainingSize() == 20); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java new file mode 100644 index 0000000..585d0ba --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.publisher; + +import java.util.List; + +/** + * @author Manjesh Gowda. Creation Date: 11/7/2016. + */ +public class DMaaPMRPublisherQueueMockImpl implements DMaaPMRPublisherQueue { + @Override + public int addBatchMessages(List<String> batchMessages) throws IllegalStateException { + return 100; + } + + @Override + public int addRecoverableMessages(List<String> recoverableMessages) throws IllegalStateException { + return 0; + } + + @Override + public List<String> getMessageForPublishing() { + return null; + } + + @Override + public int getBatchQueueRemainingSize() { + return 0; + } + + @Override + public int getRecoveryQueueRemainingSize() { + return 0; + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java new file mode 100644 index 0000000..6d8c79a --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.subscriber; + +import com.jayway.jsonassert.impl.matcher.IsCollectionWithSize; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.openecomp.dcae.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; + +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.BDDMockito.given; + +/** + * @author Rajiv Singla. Creation Date: 10/21/2016. + */ +@RunWith(MockitoJUnitRunner.class) +public class DMaaPMRSubscriberImplTest extends BaseAnalyticsDMaaPUnitTest { + + @Mock + CloseableHttpClient closeableHttpClient; + + private String consumerGroup, consumerId; + + @Before + public void setUp() throws Exception { + Random random = new Random(10000L); + consumerGroup = "Test-Consumer-Group" + Long.toString(random.nextLong()); + consumerId = UUID.randomUUID().toString(); + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testSubscriberSuccessfullyReceiveDmaapMessage() throws Exception { + + String testMessages = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, testMessages)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(2)); + } + + @Test + public void testSubscriberSuccessfullyReceiveDmaapMessageWithNoUsername() throws Exception { + + DMaaPMRSubscriberConfig dmaapMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setConsumerGroup(consumerGroup != null ? consumerGroup : SUBSCRIBER_CONSUMER_GROUP_NAME) + .setConsumerId(consumerId != null ? consumerId : SUBSCRIBER_CONSUMER_ID) + .setTimeoutMS(SUBSCRIBER_TIMEOUT_MS) + .setMessageLimit(SUBSCRIBER_MESSAGE_LIMIT).build(); + + String testMessages = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, testMessages)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + dmaapMRSubscriberConfig, closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(2)); + } + + @Test + public void testSubscriberSuccessfullyReceiveNoDmaapMessage() throws Exception { + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, null)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(0)); + } + + @Test + public void testSubscriberSuccessfullyReceiveErrorMessage() throws Exception { + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(400, "Bad Request")); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(400)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(0)); + } + + @Rule + public ExpectedException httpIOException = ExpectedException.none(); + + @Test + public void testSubscriberSuccessfullyReceiveException() throws Exception { + + httpIOException.expect(DCAEAnalyticsRuntimeException.class); + httpIOException.expectCause(isA(IOException.class)); + + given(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java new file mode 100644 index 0000000..87a76d0 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START========================================================= + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.analytics.dmaap.service.subscriber; + +import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl; + +import java.util.Date; + +/** + * @author Rajiv Singla. Creation Date: 10/21/2016. + */ +public class DMaaPMRSubscriberMockImpl implements DMaaPMRSubscriber { + + @Override + public DMaaPMRSubscriberResponse fetchMessages() throws DCAEAnalyticsRuntimeException { + return new DMaaPMRSubscriberResponseImpl(102, "Mock Response", null); + } + + @Override + public Date getSubscriberCreationTime() { + return null; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dcae-analytics-dmaap/src/test/resources/logback-test.xml b/dcae-analytics-dmaap/src/test/resources/logback-test.xml new file mode 100644 index 0000000..e1cb987 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/resources/logback-test.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START========================================================= + ~ dcae-analytics + ~ ================================================================================ + ~ Copyright © 2017 AT&T Intellectual Property. All rights reserved. + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<configuration debug="false"> + + <!-- + Disabling some chatty loggers. + --> + <logger name="org.apache.commons.beanutils" level="ERROR"/> + <logger name="org.apache.zookeeper.server" level="ERROR"/> + <logger name="org.apache.zookeeper" level="ERROR"/> + <logger name="com.ning" level="WARN"/> + <logger name="org.apache.spark" level="WARN"/> + <logger name="org.spark-project" level="WARN"/> + <logger name="org.apache.hadoop" level="WARN"/> + <logger name="org.apache.hive" level="WARN"/> + <logger name="org.quartz.core" level="WARN"/> + <logger name="org.eclipse.jetty" level="WARN"/> + <logger name="io.netty.util.internal" level="WARN"/> + + <logger name="org.apache.twill" level="WARN"/> + <logger name="co.cask.cdap" level="INFO"/> + <logger name="org.openecomp" level="DEBUG"/> + + <appender name="Console" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <root level="ERROR"> + <appender-ref ref="Console"/> + </root> + + +</configuration> + |