/* * ===============================LICENSE_START====================================== * dcae-analytics * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============================LICENSE_END=========================================== */ package org.onap.dcae.apod.analytics.it.plugins; import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.dataset.table.Table; import co.cask.cdap.api.plugin.PluginClass; import co.cask.cdap.api.plugin.PluginPropertyField; import co.cask.cdap.common.utils.Tasks; import co.cask.cdap.datastreams.DataStreamsApp; import co.cask.cdap.datastreams.DataStreamsSparkLauncher; import co.cask.cdap.etl.api.streaming.StreamingSource; import co.cask.cdap.etl.mock.batch.MockSink; import co.cask.cdap.etl.proto.v2.DataStreamsConfig; import co.cask.cdap.etl.proto.v2.ETLPlugin; import co.cask.cdap.etl.proto.v2.ETLStage; import co.cask.cdap.proto.artifact.AppRequest; import co.cask.cdap.proto.artifact.ArtifactSummary; import co.cask.cdap.proto.id.ApplicationId; import co.cask.cdap.proto.id.ArtifactId; import co.cask.cdap.proto.id.NamespaceId; import co.cask.cdap.test.ApplicationManager; import co.cask.cdap.test.DataSetManager; import co.cask.cdap.test.SparkManager; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; import org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRReceiver; import org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRSource; import org.onap.dcae.apod.analytics.it.dmaap.DMaaPMRCreator; import org.onap.dcae.apod.analytics.it.module.AnalyticsITInjectorSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * Performs integration testing on DMaaP source plugin , where 2 sample messages are posted and verified *

* @author Manjesh Gowda. Creation Date: 2/3/2017. */ public class DMaaPMRSourcePluginIT extends BaseAnalyticsPluginsIT { private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSourcePluginIT.class); protected static final ArtifactId DATASTREAMS_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "3.2.0"); protected static final ArtifactSummary DATASTREAMS_ARTIFACT = new ArtifactSummary("data-streams", "3.2.0"); /** * Streaming artifacts are added to the hydrator pipeline. Important. Make sure you explicitly add all the custom * class that you have written in the plugin artifact, if not you will get incompatible type error * * @throws Exception */ @BeforeClass public static void setupTest() throws Exception { setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class); // Set parents = ImmutableSet.of( // new ArtifactRange(NamespaceId.DEFAULT.toId(), DATASTREAMS_ARTIFACT_ID.getArtifact(), // new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true, // new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true) // ); ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact( "dcae-analytics-cdap-plugins", "2.0-SNAPSHOT"); addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATASTREAMS_ARTIFACT_ID, ImmutableSet.of(getPluginClass()), DMaaPMRSource.class, DMaaPMRSourcePluginConfig.class, DMaaPMRReceiver.class); // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), parents, // DMaaPMRSource.class, DMaaPMRReceiver.class, DMaaPMRSourcePluginConfig.class); } private static PluginClass getPluginClass() { HashMap properties = new HashMap<>(); properties.put("referenceName", new PluginPropertyField("referenceName", "", "string", false, false)); properties.put("hostName", new PluginPropertyField("hostName", "", "string", false, false)); properties.put("topicName", new PluginPropertyField("topicName", "", "string", false, false)); properties.put("protocol", new PluginPropertyField("protocol", "", "string", false, false)); properties.put("userName", new PluginPropertyField("userName", "", "string", false, false)); properties.put("userPassword", new PluginPropertyField("userPassword", "", "string", false, false)); properties.put("contentType", new PluginPropertyField("contentType", "", "string", false, false)); properties.put("consumerId", new PluginPropertyField("consumerId", "", "string", false, false)); properties.put("consumerGroup", new PluginPropertyField("consumerGroup", "", "string", false, false)); properties.put("portNumber", new PluginPropertyField("portNumber", "", "long", false, false)); properties.put("timeoutMS", new PluginPropertyField("timeoutMS", "", "long", false, false)); properties.put("messageLimit", new PluginPropertyField("messageLimit", "", "long", false, false)); properties.put("pollingInterval", new PluginPropertyField("pollingInterval", "", "long", false, false)); return new PluginClass("streamingsource", "DMaaPMRSource", "", DMaaPMRSource.class.getName(), "pluginConfig", properties); } @AfterClass public static void cleanup() { } /** * Build a pipeline with a mock-sink. After that publish coupe of messages to the subscriber topic, and verify in * the mock sink * * @throws Exception */ @Test public void testDMaaPMRSourcePlugin() throws Exception { AnalyticsITInjectorSource analyticsITInjectorSource = new AnalyticsITInjectorSource(); final DMaaPMRCreator dMaaPMRCreator = analyticsITInjectorSource.getInjector().getInstance(DMaaPMRCreator.class); Map dmaapSourceProperties = dMaaPMRCreator.getDMaaPMRSubscriberConfig(); dmaapSourceProperties.put("consumerId", UUID.randomUUID().toString().replace("-", "")); dmaapSourceProperties.put("consumerGroup", UUID.randomUUID().toString().replace("-", "")); final String subscriberTopicName = dmaapSourceProperties.get("topicName"); DataStreamsConfig dmaaPMRSourcePipeline = DataStreamsConfig.builder() .addStage(new ETLStage("source", new ETLPlugin( "DMaaPMRSource", StreamingSource.PLUGIN_TYPE, dmaapSourceProperties, null))) .addStage(new ETLStage("sink", MockSink.getPlugin("dmaapOutput"))) .addConnection("source", "sink") .setBatchInterval("20s") .build(); AppRequest appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, dmaaPMRSourcePipeline); ApplicationId appId = NamespaceId.DEFAULT.app("DMaaPMRSourceIntegrationTestingApp"); ApplicationManager appManager = deployApplication(appId.toId(), appRequest); SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME); sparkManager.start(); sparkManager.waitForStatus(true, 1, 100); final DataSetManager outputManager = getDataset("dmaapOutput"); final List dmaapContents = new LinkedList<>(); // Publish message to source new Thread() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(30000); dMaaPMRCreator.getDMaaPMRPublisherWithTopicName(subscriberTopicName).publish(getTwoSampleMessage()); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); try { Tasks.waitFor(true, new Callable() { boolean initialized = false; @Override public Boolean call() throws Exception { try { outputManager.flush(); for (StructuredRecord record : MockSink.readOutput(outputManager)) { dmaapContents.add((String) record.get("message")); } return dmaapContents.size() >= 2; } catch (Exception e) { e.printStackTrace(); return false; } } }, 90, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } sparkManager.stop(); Assert.assertTrue(dmaapContents.size() == 2); String allMessages = Joiner.on(",").join(dmaapContents); Assert.assertTrue(allMessages.contains("Message 1")); Assert.assertTrue(allMessages.contains("Message 2")); } }