aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-it
diff options
context:
space:
mode:
authoran4828 <nekrassov@att.com>2017-08-21 11:05:08 -0400
committerLusheng Ji <lji@research.att.com>2017-08-24 00:56:45 +0000
commite86be39dc5ff812b73398e0720aa3fbf0c48213c (patch)
treefe3198f180128163490c14c66a1d6074760b220b /dcae-analytics-it
parentff6a13c7ce03ec95fba9d0b4f04b74d0bfeb6a47 (diff)
Initial TCA commit into DCAEGEN2
Change-Id: I5f7f8af2a00419854cafc34b79277df60d1af095 Issue-ID: DCAEGEN2-53 Signed-off-by: an4828 <nekrassov@att.com>
Diffstat (limited to 'dcae-analytics-it')
-rw-r--r--dcae-analytics-it/pom.xml214
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/CucumberRunnerIT.java40
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/steps/DMaaPMRSteps.java129
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreator.java83
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreatorImpl.java225
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/AnalyticsITInjectorSource.java41
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/IntegrationTestModule.java126
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/BaseAnalyticsPluginsIT.java51
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/DMaaPMRSourcePluginIT.java204
-rw-r--r--dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/util/StepUtils.java43
-rw-r--r--dcae-analytics-it/src/test/resources/cucumber.properties21
-rw-r--r--dcae-analytics-it/src/test/resources/cucumber/features/dmaap/message_router.feature10
-rw-r--r--dcae-analytics-it/src/test/resources/data/json_message.json3
-rw-r--r--dcae-analytics-it/src/test/resources/env/dev.properties46
-rw-r--r--dcae-analytics-it/src/test/resources/logback-test.xml55
15 files changed, 1291 insertions, 0 deletions
diff --git a/dcae-analytics-it/pom.xml b/dcae-analytics-it/pom.xml
new file mode 100644
index 0000000..4e849a7
--- /dev/null
+++ b/dcae-analytics-it/pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ===============================LICENSE_START======================================
+ ~ dcae-analytics
+ ~ ================================================================================
+ ~ Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============================LICENSE_END===========================================
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>dcae-analytics</artifactId>
+ <groupId>org.openecomp.dcae.apod.analytics</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+
+ <artifactId>dcae-analytics-it</artifactId>
+ <packaging>jar</packaging>
+
+
+ <!-- THIS MODULE CONTAINS INTEGRATION TESTS FOR ALL DCAE ANALYTICS MODULES - MUST NOT BE USED FOR PROD DEPLOYMENT-->
+ <name>DCAE Analytics Integration Tests</name>
+ <description>Contains Integration Tests for all DCAE Analytics Modules</description>
+
+
+ <properties>
+ <main.basedir>${project.parent.basedir}</main.basedir>
+ </properties>
+
+
+ <dependencies>
+
+ <!-- DCAE PROJECT DEPENDENCIES -->
+ <dependency>
+ <groupId>org.openecomp.dcae.apod.analytics</groupId>
+ <artifactId>dcae-analytics-cdap-plugins</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <!-- CDAP -->
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-etl-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-etl-api-spark</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-etl-realtime</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-data-pipeline</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ </dependency>
+
+ <!-- CASK -->
+ <dependency>
+ <groupId>co.cask.http</groupId>
+ <artifactId>netty-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>co.cask.common</groupId>
+ <artifactId>common-http</artifactId>
+ </dependency>
+
+ <!-- HADOOP -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+
+ <!-- SPARK -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ </dependency>
+
+ <!-- LOGGING -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <!-- FIND BUGS -->
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${findbugs.jsr305.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ <version>${findbugs.annotations.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- TEST DEPENDENCIES -->
+ <dependency>
+ <groupId>org.openecomp.dcae.apod.analytics</groupId>
+ <artifactId>dcae-analytics-test</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-unit-test</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>org.apache.httpcomponents</artifactId>
+ <groupId>httpcore</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>hydrator-test</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>co.cask.cdap</groupId>
+ <artifactId>cdap-data-streams</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>info.cukes</groupId>
+ <artifactId>cucumber-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>info.cukes</groupId>
+ <artifactId>cucumber-guice</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>info.cukes</groupId>
+ <artifactId>cucumber-junit</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx2048m -Djava.awt.headless=true -XX:+UseConcMarkSweepGC
+ -XX:OnOutOfMemoryError="kill -9 %p" -XX:+HeapDumpOnOutOfMemoryError
+ -Dcuke4duke.objectFactory=cuke4duke.internal.jvmclass.GuiceFactory
+ </argLine>
+ <systemPropertyVariables>
+ <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ <logback.configurationFile>
+ ${basedir}/src/test/resources/logback-test.xml
+ </logback.configurationFile>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/CucumberRunnerIT.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/CucumberRunnerIT.java
new file mode 100644
index 0000000..83153a0
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/CucumberRunnerIT.java
@@ -0,0 +1,40 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.cucumber;
+
+import cucumber.api.CucumberOptions;
+import cucumber.api.SnippetType;
+import cucumber.api.junit.Cucumber;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+
+@RunWith(Cucumber.class)
+@CucumberOptions(
+ plugin = {"pretty", "json:target/cucumber/cucumber.json", "html:target/cucumber"},
+ features = {"src/test/resources/cucumber/features/dmaap"},
+ glue = {"org.openecomp.dcae.apod.analytics.it.cucumber.steps"},
+ snippets = SnippetType.CAMELCASE
+)
+public class CucumberRunnerIT {
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/steps/DMaaPMRSteps.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/steps/DMaaPMRSteps.java
new file mode 100644
index 0000000..c341c2a
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/cucumber/steps/DMaaPMRSteps.java
@@ -0,0 +1,129 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.cucumber.steps;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import cucumber.api.java.en.And;
+import cucumber.api.java.en.Given;
+import cucumber.api.java.en.Then;
+import cucumber.api.java.en.When;
+import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.openecomp.dcae.apod.analytics.it.dmaap.DMaaPMRCreator;
+import org.openecomp.dcae.apod.analytics.it.util.StepUtils;
+import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsIT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+public class DMaaPMRSteps extends BaseDCAEAnalyticsIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSteps.class);
+
+ private final DMaaPMRCreator dMaaPMRCreator;
+ private final String defaultPublisherTopicName;
+
+ private static DMaaPMRSubscriberResponse subscriberResponse;
+ private static String messageToPublish;
+ private static String fetchedMessage;
+
+ @Inject
+ public DMaaPMRSteps(DMaaPMRCreator dMaaPMRCreator,
+ @Named("dmaap.mr.publisher.topicName") String defaultPublisherTopicName) {
+ this.dMaaPMRCreator = dMaaPMRCreator;
+ this.defaultPublisherTopicName = defaultPublisherTopicName;
+ }
+
+
+ @Given("^DMaaP MR Service is up$")
+ public void dmaapMRServiceIsUp() throws Throwable {
+ final DMaaPMRSubscriber subscriber =
+ dMaaPMRCreator.getDMaaPMRSubscriberWithTopicName(defaultPublisherTopicName);
+ final DMaaPMRSubscriberResponse subscriberResponse = subscriber.fetchMessages();
+ assertNotNull(subscriberResponse.getResponseCode());
+ assertTrue(HTTPUtils.isSuccessfulResponseCode(subscriberResponse.getResponseCode()));
+ LOG.info("Subscriber is able to fetch messages successfully - Verified DMaaP MR Service is UP");
+ }
+
+ @When("^I publish json message to publisher topic name \"([^\"]*)\" in file \"([^\"]*)\"$")
+ public void iPublishJsonMessageToPublisherTopicNameInFile(String publisherTopicName, String fileLocation)
+ throws Throwable {
+ String publisherTopic;
+ if (StepUtils.isDefaultPublisherTopic(publisherTopicName)) {
+ publisherTopic = defaultPublisherTopicName;
+ } else {
+ publisherTopic = publisherTopicName;
+ }
+ final DMaaPMRPublisher publisher = dMaaPMRCreator.getDMaaPMRPublisherWithTopicName(publisherTopic);
+ messageToPublish = fromStream(fileLocation);
+ final DMaaPMRPublisherResponse publisherResponse = publisher.publish(Arrays.asList(messageToPublish));
+ LOG.info("Publisher published messages to DMaaP MR Topic - Response: {}", publisherResponse);
+ assertTrue(HTTPUtils.isSuccessfulResponseCode(publisherResponse.getResponseCode()));
+ }
+
+ @And("^wait for \"([^\"]*)\" seconds$")
+ public void waitForSeconds(Integer waitInSeconds) throws Throwable {
+ TimeUnit.SECONDS.sleep(waitInSeconds);
+ LOG.info("Waking up after sleep: {} seconds", waitInSeconds);
+ }
+
+ @And("^subscriber fetch message from publisher topic name \"([^\"]*)\"$")
+ public void fetchMessageFrom(String publisherTopicName) throws Throwable {
+ String publisherTopic;
+ if (StepUtils.isDefaultPublisherTopic(publisherTopicName)) {
+ publisherTopic = defaultPublisherTopicName;
+ } else {
+ publisherTopic = publisherTopicName;
+ }
+ final DMaaPMRSubscriber subscriber = dMaaPMRCreator.getDMaaPMRSubscriberWithTopicName(publisherTopic);
+ subscriberResponse = subscriber.fetchMessages();
+ LOG.info("Subscriber fetched messages to DMaaP MR Topic - Response: {}", subscriberResponse);
+ assertTrue(HTTPUtils.isSuccessfulResponseCode(subscriberResponse.getResponseCode()));
+ }
+
+ @And("^compare fetched json message with published message$")
+ public void compareFetchedJsonMessageWithPublishedMessage() throws Throwable {
+
+ fetchedMessage = subscriberResponse.getFetchedMessages().get(0);
+ LOG.info("Fetched Json Message: {}", fetchedMessage);
+ LOG.info("Published Json Message: {}", messageToPublish);
+ }
+
+ @Then("^fetched message must be same as published message$")
+ public void fetchedMessageMustBeSameAsPublishedMessage() throws Throwable {
+ assertJson(messageToPublish, fetchedMessage);
+ }
+
+
+
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreator.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreator.java
new file mode 100644
index 0000000..12a31a5
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreator.java
@@ -0,0 +1,83 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.dmaap;
+
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+
+import java.util.Map;
+
+/**
+ * Creates DMaaP MR Publisher and Subscriber Instances for Integration Testing purposes
+ * <p>
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+public interface DMaaPMRCreator {
+
+ /**
+ * Provides {@link DMaaPMRSubscriber} instance for Integration testing configured for the specific test
+ * environment
+ *
+ * @return DMaaP MR Subscriber instance for integration testing
+ */
+ DMaaPMRSubscriber getDMaaPMRSubscriber();
+
+
+ /**
+ * Provides {@link DMaaPMRSubscriber} instance for Integration testing configured with given topic name
+ *
+ * @param subscriberTopicName DMaaP MR Subscriber Topic Name
+ * @return DMaaP MR Subscriber instance which is subscriber to given subscriber topic
+ */
+ DMaaPMRSubscriber getDMaaPMRSubscriberWithTopicName(String subscriberTopicName);
+
+ /**
+ * Provides {@link DMaaPMRPublisher} instance for Integration testing configured for the specific test
+ * environment
+ *
+ * @return DMaaP MR Publisher instance for integration testing
+ */
+ DMaaPMRPublisher getDMaaPMRPublisher();
+
+
+ /**
+ * Provides {@link DMaaPMRPublisher} instance for Integration testing configured with given topic name
+ *
+ * @param publisherTopicName DMaaP MR publisher topic name
+ * @return DMaaP MR Publisher instance for integration testing
+ */
+ DMaaPMRPublisher getDMaaPMRPublisherWithTopicName(String publisherTopicName);
+
+ /**
+ * Provides a map of DMaaP subscriber config for Integration testing configured with given topic name
+ *
+ * @return Map of key-value pair of subscriber config
+ */
+ Map<String, String> getDMaaPMRSubscriberConfig();
+
+ /**
+ * Provides a map of DMaaP publisher config for Integration testing configured with given topic name
+ *
+ * @return Map of key-value pair of publisher config
+ */
+ Map<String, String> getDMaaPMRPublisherConfig();
+
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreatorImpl.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreatorImpl.java
new file mode 100644
index 0000000..fed16ee
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/dmaap/DMaaPMRCreatorImpl.java
@@ -0,0 +1,225 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.dmaap;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+public class DMaaPMRCreatorImpl implements DMaaPMRCreator {
+
+ private final String subscriberHostName;
+
+ private final Integer subscriberHostPort;
+
+ private final String subscriberTopicName;
+
+ private final String subscriberProtocol;
+
+ private final String subscriberUserName;
+
+ private final String subscriberUserPassword;
+
+ private final String subscriberContentType;
+
+ private final String subscriberConsumerId;
+
+ private final String subscriberConsumerGroup;
+
+ private final Integer subscriberTimeoutMS;
+
+ private final Integer subscriberMessageLimit;
+
+ private final Integer subscriberPollingInterval;
+
+ // publisher preferences
+ private final String publisherHostName;
+
+ private final Integer publisherHostPort;
+
+ private final String publisherTopicName;
+
+ private final String publisherProtocol;
+
+ private final String publisherUserName;
+
+ private final String publisherUserPassword;
+
+ private final String publisherContentType;
+
+ private final Integer publisherPollingInterval;
+
+ private final Integer publisherMaxBatchSize;
+
+ private final Integer publisherMaxRecoveryQueueSize;
+
+
+ private final DMaaPMRFactory dMaaPMRFactory;
+
+ @Inject
+ public DMaaPMRCreatorImpl(@Named("dmaap.mr.subscriber.hostname") String subscriberHostName,
+ @Named("dmaap.mr.subscriber.portNumber") Integer subscriberHostPort,
+ @Named("dmaap.mr.subscriber.topicName") String subscriberTopicName,
+ @Named("dmaap.mr.subscriber.protocol") String subscriberProtocol,
+ @Named("dmaap.mr.subscriber.username") String subscriberUserName,
+ @Named("dmaap.mr.subscriber.userPassword") String subscriberUserPassword,
+ @Named("dmaap.mr.subscriber.contentType") String subscriberContentType,
+ @Named("dmaap.mr.subscriber.consumerId") String subscriberConsumerId,
+ @Named("dmaap.mr.subscriber.consumerGroup") String subscriberConsumerGroup,
+ @Named("dmaap.mr.subscriber.timeoutMS") Integer subscriberTimeoutMS,
+ @Named("dmaap.mr.subscriber.messageLimit") Integer subscriberMessageLimit,
+ @Named("dmaap.mr.subscriber.pollingInterval") Integer subscriberPollingInterval,
+ @Named("dmaap.mr.publisher.hostname") String publisherHostName,
+ @Named("dmaap.mr.publisher.portNumber") Integer publisherHostPort,
+ @Named("dmaap.mr.publisher.topicName") String publisherTopicName,
+ @Named("dmaap.mr.publisher.protocol") String publisherProtocol,
+ @Named("dmaap.mr.publisher.username") String publisherUserName,
+ @Named("dmaap.mr.publisher.userPassword") String publisherUserPassword,
+ @Named("dmaap.mr.publisher.contentType") String publisherContentType,
+ @Named("dmaap.mr.publisher.pollingInterval") Integer publisherPollingInterval,
+ @Named("dmaap.mr.publisher.maxBatchSize") Integer publisherMaxBatchSize,
+ @Named("dmaap.mr.publisher.maxRecoveryQueueSize") Integer publisherMaxRecoveryQueueSize) {
+ this.subscriberHostName = subscriberHostName;
+ this.subscriberHostPort = subscriberHostPort;
+ this.subscriberTopicName = subscriberTopicName;
+ this.subscriberProtocol = subscriberProtocol;
+ this.subscriberUserName = subscriberUserName;
+ this.subscriberUserPassword = subscriberUserPassword;
+ this.subscriberContentType = subscriberContentType;
+ this.subscriberConsumerId = subscriberConsumerId;
+ this.subscriberConsumerGroup = subscriberConsumerGroup;
+ this.subscriberTimeoutMS = subscriberTimeoutMS;
+ this.subscriberMessageLimit = subscriberMessageLimit;
+ this.subscriberPollingInterval = subscriberPollingInterval;
+ this.publisherHostName = publisherHostName;
+ this.publisherHostPort = publisherHostPort;
+ this.publisherTopicName = publisherTopicName;
+ this.publisherProtocol = publisherProtocol;
+ this.publisherUserName = publisherUserName;
+ this.publisherUserPassword = publisherUserPassword;
+ this.publisherContentType = publisherContentType;
+ this.publisherPollingInterval = publisherPollingInterval;
+ this.publisherMaxBatchSize = publisherMaxBatchSize;
+ this.publisherMaxRecoveryQueueSize = publisherMaxRecoveryQueueSize;
+
+ this.dMaaPMRFactory = DMaaPMRFactory.create();
+ }
+
+
+ @Override
+ public DMaaPMRSubscriber getDMaaPMRSubscriber() {
+ final DMaaPMRSubscriberConfig subscriberConfig =
+ new DMaaPMRSubscriberConfig.Builder(subscriberHostName, subscriberTopicName)
+ .setPortNumber(subscriberHostPort)
+ .setProtocol(subscriberProtocol)
+ .setUserName(subscriberUserName)
+ .setUserPassword(subscriberUserPassword)
+ .setContentType(subscriberContentType)
+ .setMessageLimit(subscriberMessageLimit)
+ .setTimeoutMS(subscriberTimeoutMS)
+ .setConsumerId(subscriberConsumerId)
+ .setConsumerGroup(subscriberConsumerGroup)
+ .build();
+ return dMaaPMRFactory.createSubscriber(subscriberConfig);
+ }
+
+ @Override
+ public DMaaPMRPublisher getDMaaPMRPublisher() {
+ final DMaaPMRPublisherConfig publisherConfig =
+ new DMaaPMRPublisherConfig.Builder(publisherHostName, publisherTopicName)
+ .setPortNumber(publisherHostPort)
+ .setProtocol(publisherProtocol)
+ .setUserName(publisherUserName)
+ .setUserPassword(publisherUserPassword)
+ .setContentType(publisherContentType)
+ .setMaxBatchSize(publisherMaxBatchSize)
+ .setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize)
+ .build();
+ return dMaaPMRFactory.createPublisher(publisherConfig);
+ }
+
+ @Override
+ public DMaaPMRSubscriber getDMaaPMRSubscriberWithTopicName(String subscriberTopicName) {
+ final DMaaPMRSubscriberConfig subscriberConfig =
+ new DMaaPMRSubscriberConfig.Builder(subscriberHostName, subscriberTopicName)
+ .setPortNumber(subscriberHostPort)
+ .setProtocol(subscriberProtocol)
+ .setUserName(subscriberUserName)
+ .setUserPassword(subscriberUserPassword)
+ .setContentType(subscriberContentType)
+ .setMessageLimit(subscriberMessageLimit)
+ .setTimeoutMS(subscriberTimeoutMS)
+ .setConsumerId(subscriberConsumerId)
+ .setConsumerGroup(subscriberConsumerGroup)
+ .build();
+ return dMaaPMRFactory.createSubscriber(subscriberConfig);
+ }
+
+
+ @Override
+ public DMaaPMRPublisher getDMaaPMRPublisherWithTopicName(String publisherTopicName) {
+ final DMaaPMRPublisherConfig publisherConfig =
+ new DMaaPMRPublisherConfig.Builder(publisherHostName, publisherTopicName)
+ .setPortNumber(publisherHostPort)
+ .setProtocol(publisherProtocol)
+ .setUserName(publisherUserName)
+ .setUserPassword(publisherUserPassword)
+ .setContentType(publisherContentType)
+ .setMaxBatchSize(publisherMaxBatchSize)
+ .setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize)
+ .build();
+ return dMaaPMRFactory.createPublisher(publisherConfig);
+ }
+
+ @Override
+ public Map<String, String> getDMaaPMRSubscriberConfig() {
+ Map<String, String> sourceConfigurationMap = new HashMap<>();
+ sourceConfigurationMap.put("referenceName", "source-referenceName");
+ sourceConfigurationMap.put("hostName", subscriberHostName);
+ sourceConfigurationMap.put("portNumber", subscriberHostPort.toString());
+ sourceConfigurationMap.put("topicName", subscriberTopicName);
+ sourceConfigurationMap.put("pollingInterval", subscriberPollingInterval.toString());
+ sourceConfigurationMap.put("protocol", subscriberProtocol);
+ sourceConfigurationMap.put("userName", subscriberUserName);
+ sourceConfigurationMap.put("userPassword", subscriberUserPassword);
+ sourceConfigurationMap.put("contentType", subscriberContentType);
+ sourceConfigurationMap.put("consumerId", subscriberConsumerId);
+ sourceConfigurationMap.put("consumerGroup", subscriberConsumerGroup);
+ sourceConfigurationMap.put("timeoutMS", subscriberTimeoutMS.toString());
+ sourceConfigurationMap.put("messageLimit", subscriberMessageLimit.toString());
+ return sourceConfigurationMap;
+ }
+
+ @Override
+ public Map<String, String> getDMaaPMRPublisherConfig() {
+ return null;
+ }
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/AnalyticsITInjectorSource.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/AnalyticsITInjectorSource.java
new file mode 100644
index 0000000..ae0e8bc
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/AnalyticsITInjectorSource.java
@@ -0,0 +1,41 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.module;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Stage;
+import cucumber.api.guice.CucumberModules;
+import cucumber.runtime.java.guice.InjectorSource;
+
+/**
+ * Provides Injector for Analytics Integration testing
+ *
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+public class AnalyticsITInjectorSource implements InjectorSource {
+
+ @Override
+ public Injector getInjector() {
+ return Guice.createInjector(Stage.PRODUCTION, CucumberModules.SCENARIO, new IntegrationTestModule());
+ }
+
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/IntegrationTestModule.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/IntegrationTestModule.java
new file mode 100644
index 0000000..c1f8a4e
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/module/IntegrationTestModule.java
@@ -0,0 +1,126 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.module;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.it.dmaap.DMaaPMRCreator;
+import org.openecomp.dcae.apod.analytics.it.dmaap.DMaaPMRCreatorImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/1/2017.
+ */
+public class IntegrationTestModule implements Module {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestModule.class);
+
+ public static final String ANALYTICS_SYSTEM_VARIABLE_KEY_NAME = "analytics.it.env";
+ public static final String DEFAULT_ENVIRONMENT = "dev";
+ public static final String ENVIRONMENT_PROPERTIES_FILE_LOCATION = "env";
+
+ @Override
+ public void configure(Binder binder) {
+ final Properties envProperties = loadPropertiesFile();
+ Names.bindProperties(binder, envProperties);
+ binder.bind(DMaaPMRCreator.class).to(DMaaPMRCreatorImpl.class);
+ }
+
+
+ /**
+ * Load environment specific properties file
+ *
+ * @return environment properties
+ */
+ private Properties loadPropertiesFile() {
+ final String currentEnvironment = getCurrentEnvironment().toLowerCase();
+ final String envPropertiesFileName = currentEnvironment + ".properties";
+ final Properties envProperties = new Properties();
+ final String fileLocation = ENVIRONMENT_PROPERTIES_FILE_LOCATION + "/" + envPropertiesFileName;
+ LOG.info("===>>> EFFECTIVE ENV: {}, EFFECTIVE PROPERTIES FILE: {} <<<====", currentEnvironment, fileLocation);
+ try {
+ final InputStream propertiesFileInputStream =
+ IntegrationTestModule.class.getClassLoader().getResourceAsStream(fileLocation);
+ envProperties.load(propertiesFileInputStream);
+ } catch (FileNotFoundException e) {
+ final String errorMessage = String.format("Unable to find env properties file: %s.", fileLocation);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ } catch (IOException e) {
+ final String errorMessage = String.format("I/O Exception during loading env properties file: %s",
+ fileLocation);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ final Properties systemProperties = System.getProperties();
+ for (Object envProperty : envProperties.keySet()) {
+ final String systemPropertyValue = systemProperties.getProperty(envProperty.toString());
+ if (systemPropertyValue != null) {
+ LOG.info("Overriding System property name: {} with env property value: {}",
+ envProperty.toString(), systemPropertyValue);
+ envProperties.setProperty(envProperty.toString(), systemPropertyValue);
+ }
+ }
+
+ LOG.info("Printing Effective Environment Properties =============== >>>");
+ for (Map.Entry<Object, Object> envPropertyEntry : envProperties.entrySet()) {
+ LOG.info("{}={}", envPropertyEntry.getKey(), envPropertyEntry.getValue());
+ }
+
+ return envProperties;
+ }
+
+
+ private static String getCurrentEnvironment() {
+ // First look in environment variables
+ LOG.info("Looking for IT variable name: {} in Environment variables", ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+ final String itEnvironmentVariable = System.getenv(ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+ if (itEnvironmentVariable != null) {
+ LOG.info("Found value in Environment variables: {} for IT Environment variable", itEnvironmentVariable);
+ return itEnvironmentVariable;
+ } else {
+ LOG.info("Unable to find IT variable name: {} in Environment variable", ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+ }
+
+ // Second look inside system properties
+ LOG.info("Looking for IT variable name: {} in System variables", ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+
+ final String itSystemProperty = System.getProperty(ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+ if (itSystemProperty != null) {
+ LOG.info("Found value for System variables: {} in System variable", itSystemProperty);
+ return itSystemProperty;
+ } else {
+ LOG.info("Unable to find IT variable name: {} in System variable", ANALYTICS_SYSTEM_VARIABLE_KEY_NAME);
+ }
+
+ // return default enviroment
+ LOG.warn("Unable to find IT environment variable. Choosing default environment: {}", DEFAULT_ENVIRONMENT);
+ return DEFAULT_ENVIRONMENT;
+ }
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/BaseAnalyticsPluginsIT.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/BaseAnalyticsPluginsIT.java
new file mode 100644
index 0000000..a052c63
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/BaseAnalyticsPluginsIT.java
@@ -0,0 +1,51 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.plugins;
+
+import co.cask.cdap.etl.mock.test.HydratorTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.of;
+
+/**
+ * Base class for all the Hydrator test base , where the utility and common code for integration testing is written
+ * <p/>
+ * @author Manjesh Gowda. Creation Date: 2/3/2017.
+ */
+
+public abstract class BaseAnalyticsPluginsIT extends HydratorTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseAnalyticsPluginsIT.class);
+
+ /**
+ * Provides two simple messages for testing of Plugin
+ * <p/>
+ *
+ * @return
+ */
+ protected static List<String> getTwoSampleMessage() {
+ String message1 = "{ \"message\" : \"Test Message 1 from DMaaP source Plugin\"}";
+ String message2 = "{ \"message\" : \"Test Message 2 from DMaaP source Plugin\"}";
+ return of(message1, message2);
+ }
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/DMaaPMRSourcePluginIT.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/DMaaPMRSourcePluginIT.java
new file mode 100644
index 0000000..abffdb7
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/plugins/DMaaPMRSourcePluginIT.java
@@ -0,0 +1,204 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.plugins;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.dataset.table.Table;
+import co.cask.cdap.api.plugin.PluginClass;
+import co.cask.cdap.api.plugin.PluginPropertyField;
+import co.cask.cdap.common.utils.Tasks;
+import co.cask.cdap.datastreams.DataStreamsApp;
+import co.cask.cdap.datastreams.DataStreamsSparkLauncher;
+import co.cask.cdap.etl.api.streaming.StreamingSource;
+import co.cask.cdap.etl.mock.batch.MockSink;
+import co.cask.cdap.etl.proto.v2.DataStreamsConfig;
+import co.cask.cdap.etl.proto.v2.ETLPlugin;
+import co.cask.cdap.etl.proto.v2.ETLStage;
+import co.cask.cdap.proto.artifact.AppRequest;
+import co.cask.cdap.proto.artifact.ArtifactSummary;
+import co.cask.cdap.proto.id.ApplicationId;
+import co.cask.cdap.proto.id.ArtifactId;
+import co.cask.cdap.proto.id.NamespaceId;
+import co.cask.cdap.test.ApplicationManager;
+import co.cask.cdap.test.DataSetManager;
+import co.cask.cdap.test.SparkManager;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRReceiver;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRSource;
+import org.openecomp.dcae.apod.analytics.it.dmaap.DMaaPMRCreator;
+import org.openecomp.dcae.apod.analytics.it.module.AnalyticsITInjectorSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Performs integration testing on DMaaP source plugin , where 2 sample messages are posted and verified
+ * <p/>
+ * @author Manjesh Gowda. Creation Date: 2/3/2017.
+ */
+public class DMaaPMRSourcePluginIT extends BaseAnalyticsPluginsIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSourcePluginIT.class);
+ protected static final ArtifactId DATASTREAMS_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "3.2.0");
+ protected static final ArtifactSummary DATASTREAMS_ARTIFACT = new ArtifactSummary("data-streams", "3.2.0");
+
+ /**
+ * Streaming artifacts are added to the hydrator pipeline. Important. Make sure you explicitly add all the custom
+ * class that you have written in the plugin artifact, if not you will get incompatible type error
+ *
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class);
+
+// Set<ArtifactRange> parents = ImmutableSet.of(
+// new ArtifactRange(NamespaceId.DEFAULT.toId(), DATASTREAMS_ARTIFACT_ID.getArtifact(),
+// new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true,
+// new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true)
+// );
+
+ ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
+ "dcae-analytics-cdap-plugins", "2.0-SNAPSHOT");
+ addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATASTREAMS_ARTIFACT_ID, ImmutableSet.of(getPluginClass()),
+ DMaaPMRSource.class, DMaaPMRSourcePluginConfig.class, DMaaPMRReceiver.class);
+
+// addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), parents,
+// DMaaPMRSource.class, DMaaPMRReceiver.class, DMaaPMRSourcePluginConfig.class);
+ }
+
+ private static PluginClass getPluginClass() {
+ HashMap<String, PluginPropertyField> properties = new HashMap<>();
+ properties.put("referenceName", new PluginPropertyField("referenceName", "", "string", false, false));
+ properties.put("hostName", new PluginPropertyField("hostName", "", "string", false, false));
+ properties.put("topicName", new PluginPropertyField("topicName", "", "string", false, false));
+ properties.put("protocol", new PluginPropertyField("protocol", "", "string", false, false));
+ properties.put("userName", new PluginPropertyField("userName", "", "string", false, false));
+ properties.put("userPassword", new PluginPropertyField("userPassword", "", "string", false, false));
+ properties.put("contentType", new PluginPropertyField("contentType", "", "string", false, false));
+ properties.put("consumerId", new PluginPropertyField("consumerId", "", "string", false, false));
+ properties.put("consumerGroup", new PluginPropertyField("consumerGroup", "", "string", false, false));
+
+ properties.put("portNumber", new PluginPropertyField("portNumber", "", "long", false, false));
+ properties.put("timeoutMS", new PluginPropertyField("timeoutMS", "", "long", false, false));
+ properties.put("messageLimit", new PluginPropertyField("messageLimit", "", "long", false, false));
+ properties.put("pollingInterval", new PluginPropertyField("pollingInterval", "", "long", false, false));
+
+ return new PluginClass("streamingsource", "DMaaPMRSource", "", DMaaPMRSource.class.getName(),
+ "pluginConfig", properties);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ }
+
+ /**
+ * Build a pipeline with a mock-sink. After that publish coupe of messages to the subscriber topic, and verify in
+ * the mock sink
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDMaaPMRSourcePlugin() throws Exception {
+ AnalyticsITInjectorSource analyticsITInjectorSource = new AnalyticsITInjectorSource();
+
+ final DMaaPMRCreator dMaaPMRCreator = analyticsITInjectorSource.getInjector().getInstance(DMaaPMRCreator.class);
+ Map<String, String> dmaapSourceProperties = dMaaPMRCreator.getDMaaPMRSubscriberConfig();
+ dmaapSourceProperties.put("consumerId", UUID.randomUUID().toString().replace("-", ""));
+ dmaapSourceProperties.put("consumerGroup", UUID.randomUUID().toString().replace("-", ""));
+ final String subscriberTopicName = dmaapSourceProperties.get("topicName");
+
+ DataStreamsConfig dmaaPMRSourcePipeline = DataStreamsConfig.builder()
+ .addStage(new ETLStage("source", new ETLPlugin(
+ "DMaaPMRSource", StreamingSource.PLUGIN_TYPE, dmaapSourceProperties, null)))
+ .addStage(new ETLStage("sink", MockSink.getPlugin("dmaapOutput")))
+ .addConnection("source", "sink")
+ .setBatchInterval("20s")
+ .build();
+
+ AppRequest<DataStreamsConfig> appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, dmaaPMRSourcePipeline);
+ ApplicationId appId = NamespaceId.DEFAULT.app("DMaaPMRSourceIntegrationTestingApp");
+ ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
+
+ SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME);
+ sparkManager.start();
+ sparkManager.waitForStatus(true, 1, 100);
+
+ final DataSetManager<Table> outputManager = getDataset("dmaapOutput");
+ final List<String> dmaapContents = new LinkedList<>();
+
+ // Publish message to source
+
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(30000);
+ dMaaPMRCreator.getDMaaPMRPublisherWithTopicName(subscriberTopicName).publish(getTwoSampleMessage());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+ try {
+ Tasks.waitFor(true, new Callable<Boolean>() {
+ boolean initialized = false;
+
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ outputManager.flush();
+ for (StructuredRecord record : MockSink.readOutput(outputManager)) {
+ dmaapContents.add((String) record.get("message"));
+ }
+ return dmaapContents.size() >= 2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ },
+ 90, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ sparkManager.stop();
+
+ Assert.assertTrue(dmaapContents.size() == 2);
+ String allMessages = Joiner.on(",").join(dmaapContents);
+ Assert.assertTrue(allMessages.contains("Message 1"));
+ Assert.assertTrue(allMessages.contains("Message 2"));
+ }
+}
diff --git a/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/util/StepUtils.java b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/util/StepUtils.java
new file mode 100644
index 0000000..45324a7
--- /dev/null
+++ b/dcae-analytics-it/src/test/java/org/openecomp/dcae/apod/analytics/it/util/StepUtils.java
@@ -0,0 +1,43 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.it.util;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/2/2017.
+ */
+public abstract class StepUtils {
+
+
+ /**
+ * Determines if step should use default publisher topic as configured in environment properties file
+ *
+ * @param topicName step passed topic name
+ * @return true if default publisher topic
+ */
+ public static boolean isDefaultPublisherTopic(final String topicName) {
+ if (topicName.equalsIgnoreCase("default")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
diff --git a/dcae-analytics-it/src/test/resources/cucumber.properties b/dcae-analytics-it/src/test/resources/cucumber.properties
new file mode 100644
index 0000000..0773e97
--- /dev/null
+++ b/dcae-analytics-it/src/test/resources/cucumber.properties
@@ -0,0 +1,21 @@
+#
+# ===============================LICENSE_START======================================
+# dcae-analytics
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================LICENSE_END===========================================
+#
+
+guice.injector-source=org.openecomp.dcae.apod.analytics.it.module.AnalyticsITInjectorSource
diff --git a/dcae-analytics-it/src/test/resources/cucumber/features/dmaap/message_router.feature b/dcae-analytics-it/src/test/resources/cucumber/features/dmaap/message_router.feature
new file mode 100644
index 0000000..708c376
--- /dev/null
+++ b/dcae-analytics-it/src/test/resources/cucumber/features/dmaap/message_router.feature
@@ -0,0 +1,10 @@
+@dmaapMR
+Feature: Verify DMaaP MR Functioning
+
+ Scenario: DMaaP MR Publisher can publish message and DMaaP MR Subscriber can fetch same message
+ Given DMaaP MR Service is up
+ When I publish json message to publisher topic name "default" in file "data/json_message.json"
+ And wait for "10" seconds
+ And subscriber fetch message from publisher topic name "default"
+ And compare fetched json message with published message
+ Then fetched message must be same as published message
diff --git a/dcae-analytics-it/src/test/resources/data/json_message.json b/dcae-analytics-it/src/test/resources/data/json_message.json
new file mode 100644
index 0000000..53d029b
--- /dev/null
+++ b/dcae-analytics-it/src/test/resources/data/json_message.json
@@ -0,0 +1,3 @@
+{
+ "message": "cucumber automated testing message"
+}
diff --git a/dcae-analytics-it/src/test/resources/env/dev.properties b/dcae-analytics-it/src/test/resources/env/dev.properties
new file mode 100644
index 0000000..d889622
--- /dev/null
+++ b/dcae-analytics-it/src/test/resources/env/dev.properties
@@ -0,0 +1,46 @@
+#
+# ===============================LICENSE_START======================================
+# dcae-analytics
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================LICENSE_END===========================================
+#
+
+#DMaaP MR Subscriber properties
+dmaap.mr.subscriber.hostname=mrlocal-mtnjftle01.homer.com
+dmaap.mr.subscriber.portNumber=3905
+dmaap.mr.subscriber.topicName=com.dcae.dmaap.dev.DcaeTestVESSub
+dmaap.mr.subscriber.protocol=https
+dmaap.mr.subscriber.username=USER
+dmaap.mr.subscriber.userPassword=PASSWORD
+dmaap.mr.subscriber.contentType=application/json
+dmaap.mr.subscriber.timeoutMS=-1
+dmaap.mr.subscriber.messageLimit=-1
+dmaap.mr.subscriber.pollingInterval=10000
+dmaap.mr.subscriber.consumerId=C1
+dmaap.mr.subscriber.consumerGroup=DCAEAnalytics-G1
+
+
+#DMaaP MR Publisher properties
+dmaap.mr.publisher.hostname=mrlocal-mtnjftle01.homer.com
+dmaap.mr.publisher.portNumber=3905
+dmaap.mr.publisher.topicName=com.dcae.dmaap.dev.DcaeTestVESPub
+dmaap.mr.publisher.protocol=https
+dmaap.mr.publisher.username=USER
+dmaap.mr.publisher.userPassword=PASSWORD
+dmaap.mr.publisher.contentType=application/json
+dmaap.mr.publisher.pollingInterval=10000
+dmaap.mr.publisher.maxBatchSize=1
+dmaap.mr.publisher.maxRecoveryQueueSize=100000
diff --git a/dcae-analytics-it/src/test/resources/logback-test.xml b/dcae-analytics-it/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..255532f
--- /dev/null
+++ b/dcae-analytics-it/src/test/resources/logback-test.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ ===============================LICENSE_START======================================
+ ~ dcae-analytics
+ ~ ================================================================================
+ ~ Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============================LICENSE_END===========================================
+ -->
+<configuration debug="false">
+
+ <!--
+ Disabling some chatty loggers.
+ -->
+ <logger name="org.apache.commons.beanutils" level="ERROR"/>
+ <logger name="org.apache.zookeeper.server" level="ERROR"/>
+ <logger name="org.apache.zookeeper" level="ERROR"/>
+ <logger name="com.ning" level="WARN"/>
+ <logger name="org.apache.spark" level="WARN"/>
+ <logger name="org.spark-project" level="WARN"/>
+ <logger name="org.apache.hadoop" level="WARN"/>
+ <logger name="org.apache.hive" level="WARN"/>
+ <logger name="org.quartz.core" level="WARN"/>
+ <logger name="org.eclipse.jetty" level="WARN"/>
+ <logger name="io.netty.util.internal" level="WARN"/>
+
+ <logger name="org.apache.twill" level="WARN"/>
+ <logger name="co.cask.cdap" level="WARN"/>
+ <logger name="org.openecomp.dcae.apod.analytics" level="DEBUG"/>
+
+ <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="ERROR">
+ <appender-ref ref="Console"/>
+ </root>
+
+
+</configuration>
+