aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae')
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java238
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java77
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java75
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java62
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java95
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java80
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java84
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java76
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java84
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java50
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java56
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java63
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java229
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java119
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java75
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java91
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java81
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java74
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java58
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java84
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java171
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java57
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java64
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java86
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java85
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java107
-rw-r--r--dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java157
27 files changed, 2578 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java
new file mode 100644
index 0000000..e9b4e6f
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java
@@ -0,0 +1,238 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins;
+
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.StageMetrics;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import org.apache.hadoop.conf.Configuration;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPPluginConstants;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
+import org.onap.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;
+import org.onap.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {
+
+ protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =
+ Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();
+
+ protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";
+ protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";
+ protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =
+ "data/json/cef/cef_message_with_threshold_violation.json";
+
+
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
+ protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";
+ protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";
+ protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";
+ protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;
+ protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;
+
+
+ protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";
+ protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
+ protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;
+ protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";
+ protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";
+ protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";
+ protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";
+ protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";
+ protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";
+ protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;
+ protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;
+
+ protected static final String VES_MESSAGE_FIELD_NAME = "message";
+ protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";
+ protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";
+
+
+ protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";
+ protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";
+ protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";
+ protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =
+ "$.event.commonEventHeader.domain:measurementsForVfScaling," +
+ "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall";
+ protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\",\"fields\":" +
+ "[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}" +
+ "]" +
+ "}";
+
+ protected static class MockStageMetrics implements StageMetrics, Serializable {
+
+ @Override
+ public void count(String metricName, int delta) {
+ LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);
+ }
+
+ @Override
+ public void gauge(String metricName, long value) {
+ LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);
+ }
+
+ @Override
+ public void pipelineCount(String metricName, int delta) {
+ LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);
+ }
+
+ @Override
+ public void pipelineGauge(String metricName, long value) {
+ LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);
+ }
+ }
+
+ protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {
+ final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();
+ sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);
+ sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);
+ sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);
+ sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);
+ sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);
+ sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);
+ sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);
+ sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);
+ sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);
+ sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);
+ sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);
+ sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);
+ sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);
+ return sourcePluginConfig;
+ }
+
+ protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {
+ final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();
+ sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);
+ sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);
+ sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);
+ sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
+ sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);
+ sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);
+ sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);
+ sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
+ sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);
+ sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);
+ sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);
+ return sinkPluginConfig;
+ }
+
+
+ protected static Configuration getTestConfiguration() {
+ final Configuration configuration = new Configuration();
+ final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();
+ for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {
+ configuration.set(property.getKey(), property.getValue());
+ }
+ return configuration;
+ }
+
+ protected static Map<String, String> createSinkConfigurationMap() {
+
+ Map<String, String> sinkConfig = new LinkedHashMap<>();
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
+ DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,
+ DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
+ DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());
+ sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
+ DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());
+ return sinkConfig;
+ }
+
+ protected static Schema getDMaaPMRSinkTestSchema() {
+ return Schema.recordOf(
+ "DMaaPMRSinkTestSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("field1", Schema.of(Schema.Type.STRING))
+ );
+ }
+
+
+ protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {
+ final String policyJson;
+ try {
+ policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while parsing policy", e);
+ }
+ return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,
+ TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);
+ }
+
+ protected static Schema getSimpleTCAPluginInputSchema() {
+ return Schema.recordOf(
+ "TestSimpleTCAPluginInputSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
+ );
+ }
+
+ protected static Schema getJsonFilterPluginInputSchema() {
+ return Schema.recordOf(
+ "TestJsonFilterInputSchema",
+ Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),
+ Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING))
+ );
+ }
+
+ protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {
+ return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,
+ JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,
+ JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,
+ JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,
+ JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java
new file mode 100644
index 0000000..79c7698
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java
@@ -0,0 +1,77 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMROutputFormatProviderTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testDMaaPMROutputFormatProviderWhenConfigIsMissingNonRequiredValues() throws Exception {
+ final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();
+ sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);
+ sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
+ final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider =
+ new DMaaPMROutputFormatProvider(sinkPluginConfig);
+ final Map<String, String> outputFormatConfiguration =
+ dMaaPMROutputFormatProvider.getOutputFormatConfiguration();
+ final String hostName = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);
+ assertTrue(hostName.equals(DMAAP_MR_SINK_PLUGIN_HOST_NAME));
+ final String topicName = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);
+ assertTrue(topicName.equals(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME));
+ final String portNumber = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);
+ assertTrue(portNumber.equals(AnalyticsConstants.DEFAULT_PORT_NUMBER.toString()));
+ final String protocol = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);
+ assertTrue(protocol.equals(AnalyticsConstants.DEFAULT_PROTOCOL));
+ }
+
+ @Test
+ public void testGetOutputFormatClassName() throws Exception {
+ final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider =
+ new DMaaPMROutputFormatProvider(getTestDMaaPMRSinkPluginConfig());
+ final String outputFormatClassName = dMaaPMROutputFormatProvider.getOutputFormatClassName();
+ assertTrue(outputFormatClassName.equals(DMaaPMROutputFormat.class.getName()));
+ }
+
+ @Test
+ public void testGetOutputFormatConfiguration() throws Exception {
+ final TestDMaaPMRSinkPluginConfig testDMaaPMRSinkPluginConfig = getTestDMaaPMRSinkPluginConfig();
+ final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider =
+ new DMaaPMROutputFormatProvider(testDMaaPMRSinkPluginConfig);
+ final Map<String, String> outputFormatConfiguration =
+ dMaaPMROutputFormatProvider.getOutputFormatConfiguration();
+ assertTrue(outputFormatConfiguration.size() == 9);
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java
new file mode 100644
index 0000000..4b111a3
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java
@@ -0,0 +1,75 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMROutputFormatTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private DMaaPMROutputFormat dMaaPMROutputFormat;
+
+ @Before
+ public void before() {
+ dMaaPMROutputFormat = new DMaaPMROutputFormat();
+ }
+
+ @Test
+ public void testGetRecordWriter() throws Exception {
+ final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class);
+ when(taskAttemptContext.getConfiguration()).thenReturn(getTestConfiguration());
+ final RecordWriter<String, NullWritable> recordWriter = dMaaPMROutputFormat.getRecordWriter(taskAttemptContext);
+ assertNotNull(recordWriter);
+ final JobContext jobContext = Mockito.mock(JobContext.class);
+ dMaaPMROutputFormat.checkOutputSpecs(jobContext);
+ }
+
+ @Test
+ public void testGetOutputCommitter() throws Exception {
+ final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class);
+ final OutputCommitter outputCommitter = dMaaPMROutputFormat.getOutputCommitter(taskAttemptContext);
+ assertTrue(outputCommitter.getClass().equals(DMaaPMROutputFormat.NoOpOutputCommitter.class));
+ final JobContext jobContext = Mockito.mock(JobContext.class);
+ outputCommitter.setupJob(jobContext);
+ outputCommitter.setupTask(taskAttemptContext);
+ assertFalse(outputCommitter.needsTaskCommit(taskAttemptContext));
+ outputCommitter.commitJob(jobContext);
+ outputCommitter.commitTask(taskAttemptContext);
+ outputCommitter.abortTask(taskAttemptContext);
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java
new file mode 100644
index 0000000..3d79057
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java
@@ -0,0 +1,62 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+
+import java.util.Arrays;
+
+import static org.mockito.Mockito.times;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRRecordWriterTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private DMaaPMRPublisher publisher;
+ private DMaaPMRRecordWriter dMaaPMRRecordWriter;
+
+ @Before
+ public void before() {
+ publisher = Mockito.mock(DMaaPMRPublisher.class);
+ dMaaPMRRecordWriter = new DMaaPMRRecordWriter(publisher);
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final String testMessage = "test Message";
+ dMaaPMRRecordWriter.write(testMessage, null);
+ Mockito.verify(publisher, times(1)).publish(Arrays.asList(testMessage));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class);
+ dMaaPMRRecordWriter.close(taskAttemptContext);
+ Mockito.verify(publisher, times(1)).flush();
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java
new file mode 100644
index 0000000..d649e6e
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java
@@ -0,0 +1,95 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.Emitter;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.StageConfigurer;
+import co.cask.cdap.etl.api.batch.BatchSinkContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRSinkTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private DMaaPMRSink dMaaPMRSink;
+
+ @Before
+ public void before() {
+ dMaaPMRSink = new DMaaPMRSink(getTestDMaaPMRSinkPluginConfig());
+ }
+
+ @Test
+ public void testConfigurePipeline() throws Exception {
+ final PipelineConfigurer pipelineConfigurer = Mockito.mock(PipelineConfigurer.class);
+ final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class);
+ when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
+ when(stageConfigurer.getInputSchema()).thenReturn(getDMaaPMRSinkTestSchema());
+ dMaaPMRSink.configurePipeline(pipelineConfigurer);
+ verify(stageConfigurer, times(1)).getInputSchema();
+ }
+
+ @Test(expected = CDAPSettingsException.class)
+ public void testConfigurePipelineWithInvalidSchema() throws Exception {
+ final PipelineConfigurer pipelineConfigurer = Mockito.mock(PipelineConfigurer.class);
+ final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class);
+ when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
+ when(stageConfigurer.getInputSchema()).thenReturn(Schema.recordOf(
+ "DMaaPMRSinkInvalidSchema",
+ Schema.Field.of("message1", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("field1", Schema.of(Schema.Type.STRING))
+ ));
+ dMaaPMRSink.configurePipeline(pipelineConfigurer);
+ }
+
+ @Test
+ public void testPrepareRun() throws Exception {
+ final BatchSinkContext batchSinkContext = Mockito.mock(BatchSinkContext.class);
+ dMaaPMRSink.prepareRun(batchSinkContext);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTransform() throws Exception {
+ final StructuredRecord structuredRecord = Mockito.mock(StructuredRecord.class);
+ final Emitter emitter = Mockito.mock(Emitter.class);
+ final String incomingTestMessage = "test message";
+ when(structuredRecord.get(
+ eq(getTestDMaaPMRSinkPluginConfig().getMessageColumnName()))).thenReturn(incomingTestMessage);
+ doNothing().when(emitter).emit(any());
+ dMaaPMRSink.transform(structuredRecord, emitter);
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java
new file mode 100644
index 0000000..5c80baa
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java
@@ -0,0 +1,80 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public class DMaaPMRSinkPluginConfigTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ @Test
+ public void testDMaaPMRSinkPluginConfigDefaults() throws Exception {
+ final DMaaPMRSinkPluginConfig sinkPluginConfig = new DMaaPMRSinkPluginConfig
+ (DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME,
+ DMAAP_MR_SINK_PLUGIN_TOPIC_NAME, DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);
+
+ assertThat(sinkPluginConfig.getReferenceName(), is(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME));
+ assertThat(sinkPluginConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME));
+ assertThat(sinkPluginConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME));
+ assertThat(sinkPluginConfig.getMessageColumnName(), is(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME));
+ assertNull(sinkPluginConfig.getPortNumber());
+ assertNull(sinkPluginConfig.getProtocol());
+ assertNull(sinkPluginConfig.getUserName());
+ assertNull(sinkPluginConfig.getUserPassword());
+ assertNull(sinkPluginConfig.getContentType());
+ assertNull(sinkPluginConfig.getMaxBatchSize());
+ assertNull(sinkPluginConfig.getMaxRecoveryQueueSize());
+ }
+
+ @Test
+ public void testDMaaPMRSinkPluginConfigCustom() throws Exception {
+ final DMaaPMRSinkPluginConfig sinkPluginConfig = getTestDMaaPMRSinkPluginConfig();
+ assertThat(sinkPluginConfig.getReferenceName(), is(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME));
+ assertThat(sinkPluginConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME));
+ assertThat(sinkPluginConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME));
+ assertThat(sinkPluginConfig.getPortNumber(), is(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER));
+ assertThat(sinkPluginConfig.getProtocol(), is(DMAAP_MR_SINK_PLUGIN_PROTOCOL));
+ assertThat(sinkPluginConfig.getUserName(), is(DMAAP_MR_SINK_PLUGIN_USERNAME));
+ assertThat(sinkPluginConfig.getUserPassword(), is(DMAAP_MR_SINK_PLUGIN_PASSWORD));
+ assertThat(sinkPluginConfig.getContentType(), is(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE));
+ assertThat(sinkPluginConfig.getMessageColumnName(), is(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME));
+ assertThat(sinkPluginConfig.getMaxBatchSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE));
+ assertThat(sinkPluginConfig.getMaxRecoveryQueueSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE));
+ }
+
+ @Test
+ public void testValidToString() throws Exception {
+ final TestDMaaPMRSinkPluginConfig sinkPluginConfig = getTestDMaaPMRSinkPluginConfig();
+ assertNotNull(sinkPluginConfig.toString());
+ assertTrue(sinkPluginConfig.toString().contains(DMAAP_MR_SINK_PLUGIN_HOST_NAME));
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java
new file mode 100644
index 0000000..bb4f7f6
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java
@@ -0,0 +1,84 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public class DMaaPMRSourcePluginConfigTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ @Test
+ public void testDMaaPMRSourcePluginConfigDefaults() throws Exception {
+ final DMaaPMRSourcePluginConfig sourcePluginConfig = new DMaaPMRSourcePluginConfig
+ (DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME, DMAAP_MR_SOURCE_PLUGIN_HOST_NAME,
+ DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME, DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);
+
+ assertThat(sourcePluginConfig.getReferenceName(), is(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME));
+ assertThat(sourcePluginConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME));
+ assertThat(sourcePluginConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME));
+ assertThat(sourcePluginConfig.getPollingInterval(), is(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL));
+ assertNull(sourcePluginConfig.getPortNumber());
+ assertNull(sourcePluginConfig.getProtocol());
+ assertNull(sourcePluginConfig.getUserName());
+ assertNull(sourcePluginConfig.getUserPassword());
+ assertNull(sourcePluginConfig.getContentType());
+ assertNull(sourcePluginConfig.getConsumerGroup());
+ assertNull(sourcePluginConfig.getConsumerId());
+ assertNull(sourcePluginConfig.getMessageLimit());
+ assertNull(sourcePluginConfig.getTimeoutMS());
+ }
+
+ @Test
+ public void testDMaaPMRSourcePluginConfigCustom() throws Exception {
+ final TestDMaaPMRSourcePluginConfig sourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ assertThat(sourcePluginConfig.getReferenceName(), is(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME));
+ assertThat(sourcePluginConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME));
+ assertThat(sourcePluginConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME));
+ assertThat(sourcePluginConfig.getPollingInterval(), is(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL));
+ assertThat(sourcePluginConfig.getPortNumber(), is(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER));
+ assertThat(sourcePluginConfig.getProtocol(), is(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL));
+ assertThat(sourcePluginConfig.getUserName(), is(DMAAP_MR_SOURCE_PLUGIN_USERNAME));
+ assertThat(sourcePluginConfig.getUserPassword(), is(DMAAP_MR_SOURCE_PLUGIN_PASSWORD));
+ assertThat(sourcePluginConfig.getContentType(), is(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE));
+ assertThat(sourcePluginConfig.getConsumerGroup(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP));
+ assertThat(sourcePluginConfig.getConsumerId(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID));
+ assertThat(sourcePluginConfig.getMessageLimit(), is(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT));
+ assertThat(sourcePluginConfig.getTimeoutMS(), is(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT));
+ }
+
+ @Test
+ public void testValidToString() throws Exception {
+ final TestDMaaPMRSourcePluginConfig sourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ assertNotNull(sourcePluginConfig.toString());
+ assertTrue(sourcePluginConfig.toString().contains(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME));
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java
new file mode 100644
index 0000000..8cba5e1
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java
@@ -0,0 +1,76 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import javax.annotation.Nullable;
+
+/**
+ * Test {@link DMaaPMRSinkPluginConfig} for testing purposes only
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public class TestDMaaPMRSinkPluginConfig extends DMaaPMRSinkPluginConfig {
+
+ public void setReferenceName(String referenceName) {
+ this.referenceName = referenceName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public void setPortNumber(@Nullable Integer portNumber) {
+ this.portNumber = portNumber;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public void setProtocol(@Nullable String protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setUserName(@Nullable String userName) {
+ this.userName = userName;
+ }
+
+ public void setUserPassword(@Nullable String userPassword) {
+ this.userPassword = userPassword;
+ }
+
+ public void setContentType(@Nullable String contentType) {
+ this.contentType = contentType;
+ }
+
+ public void setMaxBatchSize(@Nullable Integer maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public void setMaxRecoveryQueueSize(@Nullable Integer maxRecoveryQueueSize) {
+ this.maxRecoveryQueueSize = maxRecoveryQueueSize;
+ }
+
+ public void setMessageColumnName(String messageColumnName) {
+ this.messageColumnName = messageColumnName;
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java
new file mode 100644
index 0000000..c33f313
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java
@@ -0,0 +1,84 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import javax.annotation.Nullable;
+
+/**
+ * Test {@link DMaaPMRSourcePluginConfig} for testing purposes only
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public class TestDMaaPMRSourcePluginConfig extends DMaaPMRSourcePluginConfig {
+
+ public void setReferenceName(String referenceName) {
+ this.referenceName = referenceName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public void setPortNumber(@Nullable Integer portNumber) {
+ this.portNumber = portNumber;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public void setPollingInterval(Integer pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+ public void setProtocol(@Nullable String protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setUserName(@Nullable String userName) {
+ this.userName = userName;
+ }
+
+ public void setUserPassword(@Nullable String userPassword) {
+ this.userPassword = userPassword;
+ }
+
+ public void setContentType(@Nullable String contentType) {
+ this.contentType = contentType;
+ }
+
+ public void setConsumerId(@Nullable String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public void setConsumerGroup(@Nullable String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public void setTimeoutMS(@Nullable Integer timeoutMS) {
+ this.timeoutMS = timeoutMS;
+ }
+
+ public void setMessageLimit(@Nullable Integer messageLimit) {
+ this.messageLimit = messageLimit;
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java
new file mode 100644
index 0000000..efdd7ae
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java
@@ -0,0 +1,50 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter;
+
+/**
+ * @author Rajiv Singla . Creation Date: 3/3/2017.
+ */
+public class TestJsonPathFilterPluginConfig extends JsonPathFilterPluginConfig {
+
+ public TestJsonPathFilterPluginConfig(final String referenceName, final String incomingJsonFieldName,
+ final String outputSchemaFieldName, final String jsonFilterMappings,
+ final String schema) {
+ super(referenceName, incomingJsonFieldName, outputSchemaFieldName, jsonFilterMappings, schema);
+ }
+
+
+ public void setIncomingJsonFieldName(String incomingJsonFieldName) {
+ this.incomingJsonFieldName = incomingJsonFieldName;
+ }
+
+ public void setOutputSchemaFieldName(String outputSchemaFieldName) {
+ this.outputSchemaFieldName = outputSchemaFieldName;
+ }
+
+ public void setJsonFilterMappings(String jsonFilterMappings) {
+ this.jsonFilterMappings = jsonFilterMappings;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java
new file mode 100644
index 0000000..6bf3252
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java
@@ -0,0 +1,56 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/17/2017.
+ */
+public class TestSimpleTCAPluginConfig extends SimpleTCAPluginConfig {
+
+ public TestSimpleTCAPluginConfig(String vesMessageFieldName, String policyJson, String alertFieldName,
+ String messageTypeFieldName, String schema, Boolean enableAlertCEFFormat) {
+ super(vesMessageFieldName, policyJson, alertFieldName, messageTypeFieldName, schema, enableAlertCEFFormat);
+ }
+
+ public void setVesMessageFieldName(String vesMessageFieldName) {
+ this.vesMessageFieldName = vesMessageFieldName;
+ }
+
+ public void setPolicyJson(String policyJson) {
+ this.policyJson = policyJson;
+ }
+
+ public void setAlertFieldName(String alertFieldName) {
+ this.alertFieldName = alertFieldName;
+ }
+
+ public void setMessageTypeFieldName(String messageTypeFieldName) {
+ this.messageTypeFieldName = messageTypeFieldName;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ public void setEnableAlertCEFFormat(Boolean enableAlertCEFFormat) {
+ this.enableAlertCEFFormat = enableAlertCEFFormat;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java
new file mode 100644
index 0000000..d14e184
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java
@@ -0,0 +1,63 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap;
+
+import co.cask.cdap.api.data.schema.Schema;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/25/2017.
+ */
+public class DMaaPSourceOutputSchemaTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testGetSchemaColumnName() throws Exception {
+ assertThat(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), is("ts"));
+ assertThat(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), is("responseCode"));
+ assertThat(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), is("responseMessage"));
+ assertThat(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), is("message"));
+ }
+
+ @Test
+ public void testGetSchema() throws Exception {
+ final Schema schema = DMaaPSourceOutputSchema.getSchema();
+ final List<Schema.Field> fields = schema.getFields();
+ final List<String> fieldNames = new LinkedList<>();
+ for (Schema.Field field : fields) {
+ fieldNames.add(field.getName());
+ }
+ assertThat(fieldNames, hasItems(
+ DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(),
+ DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(),
+ DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(),
+ DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName()));
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java
new file mode 100644
index 0000000..762e65a
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java
@@ -0,0 +1,229 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.it;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.api.dataset.table.Table;
+import co.cask.cdap.api.plugin.PluginClass;
+import co.cask.cdap.api.plugin.PluginPropertyField;
+import co.cask.cdap.common.utils.Tasks;
+import co.cask.cdap.datapipeline.DataPipelineApp;
+import co.cask.cdap.datapipeline.SmartWorkflow;
+import co.cask.cdap.etl.api.batch.SparkCompute;
+import co.cask.cdap.etl.mock.batch.MockSink;
+import co.cask.cdap.etl.mock.batch.MockSource;
+import co.cask.cdap.etl.mock.test.HydratorTestBase;
+import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
+import co.cask.cdap.etl.proto.v2.ETLPlugin;
+import co.cask.cdap.etl.proto.v2.ETLStage;
+import co.cask.cdap.proto.artifact.AppRequest;
+import co.cask.cdap.proto.artifact.ArtifactSummary;
+import co.cask.cdap.proto.id.ApplicationId;
+import co.cask.cdap.proto.id.ArtifactId;
+import co.cask.cdap.proto.id.NamespaceId;
+import co.cask.cdap.test.ApplicationManager;
+import co.cask.cdap.test.DataSetManager;
+import co.cask.cdap.test.WorkflowManager;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
+import org.onap.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;
+import org.onap.dcae.apod.analytics.common.validation.DCAEValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin
+ *
+ * @author Rajiv Singla . Creation Date: 2/17/2017.
+ */
+public class SimpleTCAPluginCDAPIT extends HydratorTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);
+
+ private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";
+ private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";
+
+ protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",
+ "4.0.0");
+ protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
+
+ private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING))
+ );
+
+ final Schema outputSchema = Schema.recordOf(
+ "outputSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
+ );
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+
+ setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
+
+
+ // Enable the below code if you want to run the test in Intelli IDEA editor
+ // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
+ // SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);
+
+ // Enable the below code if you want to run the test via command line
+ ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
+ CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);
+
+ addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,
+ ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,
+ CDAPAppSettingsValidator.class, DCAEValidator.class);
+ }
+
+ private static PluginClass getSimpleTCAPluginClass() {
+ final HashMap<String, PluginPropertyField> properties = new HashMap<>();
+ properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",
+ "string", false, false));
+ properties.put("referenceName", new PluginPropertyField("referenceName", "",
+ "string", false, false));
+ properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));
+ properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));
+ properties.put("messageTypeFieldName", new PluginPropertyField(
+ "messageTypeFieldName", "", "string", false, false));
+ properties.put("enableAlertCEFFormat", new PluginPropertyField(
+ "enableAlertCEFFormat", "", "string", false, false));
+ properties.put("schema", new PluginPropertyField(
+ "schema", "", "string", false, false));
+
+ return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),
+ "pluginConfig", properties);
+ }
+
+
+ @AfterClass
+ public static void cleanup() {
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testTransform() throws Exception {
+
+ LOG.info("Starting Test Transform");
+
+ final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");
+ final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");
+
+ final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()
+ .put("vesMessageFieldName", "message")
+ .put("referenceName", "SimpleTcaPlugin")
+ .put("policyJson", policyString)
+ .put("alertFieldName", "alert")
+ .put("messageTypeFieldName", "tcaMessageType")
+ .put("enableAlertCEFFormat", "true")
+ .put("schema", outputSchema.toString())
+ .build();
+
+ final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);
+ final ETLPlugin tcaPlugin =
+ new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);
+ final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");
+
+ final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")
+ .addStage(new ETLStage("source", mockSourcePlugin))
+ .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))
+ .addStage(new ETLStage("sink", mockSink))
+ .addConnection("source", "simpleTCAPlugin")
+ .addConnection("simpleTCAPlugin", "sink")
+ .build();
+
+ AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);
+ ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");
+ ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
+
+ List<StructuredRecord> sourceMessages = new ArrayList<>();
+ StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);
+ builder.set("message", cefMessage);
+ sourceMessages.add(builder.build());
+
+ // write records to source
+ DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));
+ MockSource.writeInput(inputManager, sourceMessages);
+
+ // manually trigger the pipeline
+ WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
+ workflowManager.start();
+ workflowManager.waitForFinish(5, TimeUnit.MINUTES);
+
+ final DataSetManager<Table> outputManager = getDataset("tcaOutput");
+
+ Tasks.waitFor(
+ TCACalculatorMessageType.COMPLIANT.name(),
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ outputManager.flush();
+ List<String> tcaOutputMessageType = new LinkedList<>();
+ for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
+ tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());
+ final List<Schema.Field> fields = outputRecord.getSchema().getFields();
+ LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);
+ for (Schema.Field field : fields) {
+ LOG.debug("Field Name: {} - Field Type: {} ---> Field Value: {}",
+ field.getName(), field.getSchema().getType(),
+ outputRecord.get(field.getName()));
+ }
+
+ }
+ return tcaOutputMessageType.get(0);
+ }
+ },
+ 4,
+ TimeUnit.MINUTES);
+
+ }
+
+ private static String getFileContentAsString(final String fileLocation) throws Exception {
+ final URI tcaPolicyURI =
+ SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();
+ List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());
+ return Joiner.on("").join(lines);
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java
new file mode 100644
index 0000000..f7e379b
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java
@@ -0,0 +1,119 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.sparkcompute.tca;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.StageConfigurer;
+import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/17/2017.
+ */
+public class SimpleTCAPluginTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private SimpleTCAPlugin simpleTCAPlugin;
+
+ @Before
+ public void before() {
+ final TestSimpleTCAPluginConfig testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig();
+ Schema outputSchema = Schema.recordOf(
+ "TestSimpleTCAPluginInputSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
+ );
+ testSimpleTCAPluginConfig.setSchema(outputSchema.toString());
+ simpleTCAPlugin = new SimpleTCAPlugin(testSimpleTCAPluginConfig);
+ }
+
+ @Test
+ public void testConfigurePipeline() throws Exception {
+ final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class);
+ final StageConfigurer stageConfigurer = mock(StageConfigurer.class);
+ when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
+ when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema());
+ simpleTCAPlugin.configurePipeline(pipelineConfigurer);
+ verify(stageConfigurer, times(1)).getInputSchema();
+ }
+
+ @Test
+ public void testTransform() throws Exception {
+
+ JavaSparkContext javaSparkContext = new JavaSparkContext("local", "test");
+
+ Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
+ Schema.Field.of("message", Schema.of(Schema.Type.STRING))
+ );
+
+ // Inapplicable Message Structured Record
+ final StructuredRecord inapplicableSR =
+ StructuredRecord.builder(sourceSchema).set("message", "test").build();
+ // compliant
+ final StructuredRecord compliantSR =
+ StructuredRecord.builder(sourceSchema).set("message",
+ fromStream(CEF_MESSAGE_JSON_FILE_LOCATION)).build();
+ // non compliant
+ final String nonCompliantCEF = fromStream(CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION);
+ final StructuredRecord nonCompliantSR =
+ StructuredRecord.builder(sourceSchema).set("message", nonCompliantCEF).build();
+
+ final List<StructuredRecord> records = new LinkedList<>();
+ records.add(inapplicableSR);
+ records.add(compliantSR);
+ records.add(nonCompliantSR);
+
+ final JavaRDD<StructuredRecord> input =
+ javaSparkContext.parallelize(records);
+ final SparkExecutionPluginContext context = Mockito.mock(SparkExecutionPluginContext.class);
+ final MockStageMetrics stageMetrics = Mockito.mock(MockStageMetrics.class);
+ when(context.getMetrics()).thenReturn(stageMetrics);
+ final List<StructuredRecord> outputRecord = simpleTCAPlugin.transform(context, input).collect();
+ assertNotNull(outputRecord);
+ assertThat(outputRecord.size(), is(3));
+
+ assertTrue(outputRecord.get(0).get("tcaMessageType").equals(TCACalculatorMessageType.INAPPLICABLE.toString()));
+ assertTrue(outputRecord.get(1).get("tcaMessageType").equals(TCACalculatorMessageType.COMPLIANT.toString()));
+ assertTrue(outputRecord.get(2).get("tcaMessageType").equals(TCACalculatorMessageType.NON_COMPLIANT.toString()));
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java
new file mode 100644
index 0000000..aca5581
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java
@@ -0,0 +1,75 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/24/2017.
+ */
+public class DMaaPMRReceiverTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testStoreStructuredRecords() throws Exception {
+
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final TestDMaaPMRReceiver dMaaPMRReceiver =
+ new TestDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), testDMaaPMRSourcePluginConfig);
+
+ final DMaaPMRSubscriber dMaaPMRSubscriber = Mockito.mock(DMaaPMRSubscriber.class);
+ final DMaaPMRSubscriberResponse subscriberResponse = Mockito.mock(DMaaPMRSubscriberResponse.class);
+ when(dMaaPMRSubscriber.fetchMessages()).thenReturn(subscriberResponse);
+ when(subscriberResponse.getFetchedMessages()).thenReturn(ImmutableList.of("Test Message"));
+ when(subscriberResponse.getResponseCode()).thenReturn(200);
+ when(subscriberResponse.getResponseMessage()).thenReturn("OK");
+ dMaaPMRReceiver.storeStructuredRecords(dMaaPMRSubscriber);
+ verify(dMaaPMRSubscriber, times(1)).fetchMessages();
+ verify(subscriberResponse, times(1)).getFetchedMessages();
+ }
+
+ @Test
+ public void testStoreStructuredRecordsWhenSubscriberThrowsException() throws Exception {
+
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final TestDMaaPMRReceiver dMaaPMRReceiver =
+ new TestDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), testDMaaPMRSourcePluginConfig);
+
+ final DMaaPMRSubscriber dMaaPMRSubscriber = Mockito.mock(DMaaPMRSubscriber.class);
+ final DMaaPMRSubscriberResponse subscriberResponse = Mockito.mock(DMaaPMRSubscriberResponse.class);
+ when(dMaaPMRSubscriber.fetchMessages()).thenThrow(DCAEAnalyticsRuntimeException.class);
+ dMaaPMRReceiver.storeStructuredRecords(dMaaPMRSubscriber);
+ verify(dMaaPMRSubscriber, times(1)).fetchMessages();
+ verify(subscriberResponse, times(0)).getFetchedMessages();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java
new file mode 100644
index 0000000..7b1f876
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java
@@ -0,0 +1,91 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.StageConfigurer;
+import co.cask.cdap.etl.api.streaming.StreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/24/2017.
+ */
+public class DMaaPMRSourceTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private PipelineConfigurer pipelineConfigurer;
+
+ @Before
+ public void before() {
+ pipelineConfigurer = Mockito.mock(PipelineConfigurer.class);
+ final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class);
+ when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
+ doNothing().when(stageConfigurer).setOutputSchema(any(Schema.class));
+ }
+
+ @Test
+ public void testDMaaPMRSourceConfigurePipelineWithValidPluginSettings() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig);
+ dMaaPMRSource.configurePipeline(pipelineConfigurer);
+ assertNotNull(dMaaPMRSource);
+ }
+
+ @Test(expected = CDAPSettingsException.class)
+ public void testDMaaPMRSourceConfigurePipelineWithInvalidPluginSettings() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ // blank out DMaaP MR Source Host
+ testDMaaPMRSourcePluginConfig.setHostName(null);
+ final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig);
+ dMaaPMRSource.configurePipeline(pipelineConfigurer);
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGetStream() throws Exception {
+ final StreamingContext streamingContext = Mockito.mock(StreamingContext.class);
+ final JavaStreamingContext javaStreamingContext = Mockito.mock(JavaStreamingContext.class);
+ final JavaReceiverInputDStream dMaaPMRReceiver = Mockito.mock(JavaReceiverInputDStream.class);
+ when(streamingContext.getSparkStreamingContext()).thenReturn(javaStreamingContext);
+ when(javaStreamingContext.receiverStream(any(Receiver.class))).thenReturn(dMaaPMRReceiver);
+
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig);
+ final JavaDStream<StructuredRecord> stream = dMaaPMRSource.getStream(streamingContext);
+ assertNotNull(stream);
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java
new file mode 100644
index 0000000..cd3e4bd
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java
@@ -0,0 +1,81 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/20/2017.
+ */
+public class MockDMaaPMRReceiverTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ protected class TestMockDMaaPMRReceiverTest extends MockDMaaPMRReceiver {
+
+ private boolean canStop = false;
+
+ public TestMockDMaaPMRReceiverTest(StorageLevel storageLevel, DMaaPMRSourcePluginConfig pluginConfig) {
+ super(storageLevel, pluginConfig);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return canStop;
+ }
+
+ @Override
+ public void store(StructuredRecord dataItem) {
+ LOG.debug("Mocking storing dataItem - {}",
+ dataItem.get(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName()));
+ }
+
+ public void setCanStop(boolean canStop) {
+ this.canStop = canStop;
+ }
+ }
+
+ @Test
+ public void testStoreStructuredRecords() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ testDMaaPMRSourcePluginConfig.setPollingInterval(100);
+ final TestMockDMaaPMRReceiverTest mockDMaaPMRReceiver = new TestMockDMaaPMRReceiverTest(StorageLevel
+ .MEMORY_ONLY(),
+ testDMaaPMRSourcePluginConfig);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ mockDMaaPMRReceiver.storeStructuredRecords(null);
+ }
+ }).start();
+ TimeUnit.MILLISECONDS.sleep(1000);
+ LOG.info("Killing Mock Subscriber after 1 ms");
+ mockDMaaPMRReceiver.setCanStop(true);
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java
new file mode 100644
index 0000000..6a842d0
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java
@@ -0,0 +1,74 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.etl.api.streaming.StreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/20/2017.
+ */
+@SuppressWarnings("unchecked")
+public class MockDMaaPMRSourceTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ @Test
+ public void testGetStream() throws Exception {
+ final StreamingContext streamingContext = Mockito.mock(StreamingContext.class);
+ final JavaStreamingContext javaStreamingContext = Mockito.mock(JavaStreamingContext.class);
+ final JavaReceiverInputDStream dMaaPMRReceiver = Mockito.mock(JavaReceiverInputDStream.class);
+ when(streamingContext.getSparkStreamingContext()).thenReturn(javaStreamingContext);
+ when(javaStreamingContext.receiverStream(any(Receiver.class))).thenReturn(dMaaPMRReceiver);
+
+ MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(getTestDMaaPMRSourcePluginConfig());
+ final JavaDStream<StructuredRecord> stream = mockDMaaPMRSource.getStream(streamingContext);
+ assertNotNull(stream);
+ }
+
+ @Test(expected = CDAPSettingsException.class)
+ public void testConfigurePipelineWhenPollingIntervalNotPresent() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ testDMaaPMRSourcePluginConfig.setPollingInterval(null);
+ final MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(testDMaaPMRSourcePluginConfig);
+ mockDMaaPMRSource.configurePipeline(null);
+ }
+
+ @Test
+ public void testConfigurePipelineWhenPollingIntervalIsPresent() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(testDMaaPMRSourcePluginConfig);
+ mockDMaaPMRSource.configurePipeline(null);
+ assertNotNull(mockDMaaPMRSource);
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java
new file mode 100644
index 0000000..0d87496
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.metrics.Metrics;
+import org.apache.spark.storage.StorageLevel;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+
+/**
+ * Test implementation for {@link DMaaPMRReceiver}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/24/2017.
+ */
+public class TestDMaaPMRReceiver extends DMaaPMRReceiver {
+
+ protected static Metrics metrics;
+
+ static {
+ metrics = Mockito.mock(Metrics.class);
+ doNothing().when(metrics).count(anyString(), anyInt());
+ doNothing().when(metrics).gauge(anyString(), anyInt());
+ }
+
+
+ public TestDMaaPMRReceiver(StorageLevel storageLevel, DMaaPMRSourcePluginConfig pluginConfig) {
+
+ super(storageLevel, pluginConfig, metrics);
+ }
+
+ @Override
+ public void store(StructuredRecord dataItem) {
+ // do nothing
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java
new file mode 100644
index 0000000..8bf91c8
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java
@@ -0,0 +1,84 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.transform.filter;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.Emitter;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.StageConfigurer;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+
+import java.util.Date;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Rajiv Singla . Creation Date: 3/3/2017.
+ */
+public class JsonPathFilterTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testInitializeWhenFilterMappingIsValid() throws Exception {
+ final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig());
+ jsonPathFilter.initialize(null);
+ }
+
+
+ @Test
+ public void configurePipeline() throws Exception {
+ final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig());
+ final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class);
+ final StageConfigurer stageConfigurer = mock(StageConfigurer.class);
+ when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
+ when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema());
+ doNothing().when(stageConfigurer).setOutputSchema(any(Schema.class));
+ jsonPathFilter.configurePipeline(pipelineConfigurer);
+ verify(stageConfigurer, times(1)).setOutputSchema(any(Schema.class));
+ }
+
+ @Test
+ public void testTransform() throws Exception {
+ final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig());
+ jsonPathFilter.initialize(null);
+ final StructuredRecord inputSR = StructuredRecord.builder(getJsonFilterPluginInputSchema())
+ .set("ts", new Date().getTime())
+ .set("responseCode", 200)
+ .set("responseMessage", "OK")
+ .set("message", fromStream(CEF_MESSAGE_JSON_FILE_LOCATION))
+ .build();
+
+ final Emitter emitter = Mockito.mock(Emitter.class);
+ doNothing().when(emitter).emit(ArgumentMatchers.any(StructuredRecord.class));
+ jsonPathFilter.transform(inputSR, emitter);
+ verify(emitter, times(1)).emit(any(StructuredRecord.class));
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java
new file mode 100644
index 0000000..2792e17
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java
@@ -0,0 +1,171 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.utils;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class CDAPPluginUtilsTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testValidateSchemaContainsFieldsWhenSchemaIsNotNull() throws Exception {
+ final Schema dMaaPMRSinkTestSchema = getDMaaPMRSinkTestSchema();
+ CDAPPluginUtils.validateSchemaContainsFields(dMaaPMRSinkTestSchema, "message");
+ }
+
+ @Test
+ public void testValidateSchemaContainsFieldsWhenInputSchemaIsNull() throws Exception {
+ CDAPPluginUtils.validateSchemaContainsFields(null, "message");
+ }
+
+ @Test
+ public void testCreateStructuredRecord() throws Exception {
+ final StructuredRecord testMessage = CDAPPluginUtils.createDMaaPMRResponseStructuredRecord("testMessage");
+ assertNotNull(testMessage);
+ }
+
+
+ @Test
+ public void testCreateOutputStructuredRecordBuilder() throws Exception {
+
+ final String messageFieldName = "message";
+ final String firstInputFieldName = "inputField1";
+ final String secondInputFieldName = "inputField2";
+
+
+ final Schema inputSchema = Schema.recordOf(
+ "inputSchema",
+ Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)),
+ Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(secondInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING)))
+ );
+
+ final String addedFieldName = "addedField";
+ final Schema outputSchema = Schema.recordOf(
+ "outputSchema",
+ Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)),
+ Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(addedFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))) // added field
+ // missing second Input Field
+ );
+
+ // input structured record
+ final String messageFieldValue = "Message String";
+ final String firstFieldValue = "Input Field 1";
+ final String secondFieldValue = "Input Field 2";
+ final StructuredRecord inputSR = StructuredRecord.builder(inputSchema)
+ .set(messageFieldName, messageFieldValue)
+ .set(firstInputFieldName, firstFieldValue)
+ .set(secondInputFieldName, secondFieldValue)
+ .build();
+
+ final StructuredRecord.Builder outputStructuredRecordBuilder =
+ CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputSR);
+
+ final String addedFieldValue = "Added Field Value";
+ final StructuredRecord outputSR = outputStructuredRecordBuilder
+ .set(addedFieldName, addedFieldValue)
+ .build();
+
+ assertThat("Added Field field value copied correctly",
+ outputSR.get(addedFieldName).toString(), is(addedFieldValue));
+
+ assertThat("Output SR has message field copied correctly",
+ outputSR.get(messageFieldName).toString(), is(messageFieldValue));
+
+ assertThat("First Field value copied correctly",
+ outputSR.get(firstInputFieldName).toString(), is(firstFieldValue));
+
+ assertNull("Second Field value is null as output schema does not have the field",
+ outputSR.get(secondInputFieldName));
+
+ }
+
+
+ @Test
+ public void testAddFieldValueToStructuredRecordBuilder() throws Exception {
+
+ final String messageFieldName = "message";
+ final String firstInputFieldName = "inputField1";
+ final String addedFieldName = "addedField";
+ final String firstFieldValue = "Input Field 1";
+ final String addedFieldValue = "Added Field Value";
+ final Schema outputSchema = Schema.recordOf(
+ "outputSchema",
+ Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)),
+ Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(addedFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))) // added field
+ );
+
+ final StructuredRecord.Builder outputSRBuilder = StructuredRecord.builder(outputSchema)
+ .set(messageFieldName, "Some message")
+ .set(firstInputFieldName, firstFieldValue);
+
+ final StructuredRecord.Builder addedFieldSRBuilder = CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(
+ outputSRBuilder, outputSchema, addedFieldName, addedFieldValue);
+
+ // Try adding field to output Structured record that is not in output schema
+ final String nonExistentFieldName = "fieldNotInOutputSchema";
+ final String nonExistentFieldValue = "Some Value";
+ final StructuredRecord outputSR = CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(
+ addedFieldSRBuilder, outputSchema, nonExistentFieldName, nonExistentFieldValue).build();
+
+ assertThat("Output SR must contain added Field which is in output schema",
+ outputSR.get(addedFieldName).toString(), is(addedFieldValue));
+ assertNull("Output SR must not contain field that is not in output schema",
+ outputSR.get(nonExistentFieldName));
+
+ }
+
+ @Test(expected = DCAEAnalyticsRuntimeException.class)
+ public void testValidateSchemaFieldTypeWhenInputSchemaIsNotValidJson() throws Exception {
+ CDAPPluginUtils.validateSchemaFieldType("Invalid Schema", "field1", Schema.Type.STRING);
+ }
+
+ @Test(expected = DCAEAnalyticsRuntimeException.class)
+ public void testSetOutputSchemaWhenOutputSchemaIsNotValidJson() throws Exception {
+ CDAPPluginUtils.setOutputSchema(null, "Invalid output Schema");
+ }
+
+ @Test(expected = DCAEAnalyticsRuntimeException.class)
+ public void testExtractFieldMappingsWhenFieldMappingValueIsEmpty() throws Exception {
+ CDAPPluginUtils.extractFieldMappings("path1:,path2:value2");
+ }
+
+ @Test(expected = DCAEAnalyticsRuntimeException.class)
+ public void testExtractFieldMappingsWhenFieldMappingAreBlank() throws Exception {
+ CDAPPluginUtils.extractFieldMappings("path1: ,path2:value2");
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java
new file mode 100644
index 0000000..9b6f7f8
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java
@@ -0,0 +1,57 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPSinkConfigMapperTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+
+ @Test
+ public void testMapToPublisherConfig() throws Exception {
+
+ final Configuration testConfiguration = getTestConfiguration();
+ final DMaaPMRPublisherConfig publisherConfig = DMaaPSinkConfigMapper.map(testConfiguration);
+
+ assertNotNull(publisherConfig);
+ assertThat(publisherConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME));
+ assertThat(publisherConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME));
+ assertThat(publisherConfig.getPortNumber(), is(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER));
+ assertThat(publisherConfig.getProtocol(), is(DMAAP_MR_SINK_PLUGIN_PROTOCOL));
+ assertThat(publisherConfig.getUserName(), is(DMAAP_MR_SINK_PLUGIN_USERNAME));
+ assertThat(publisherConfig.getUserPassword(), is(DMAAP_MR_SINK_PLUGIN_PASSWORD));
+ assertThat(publisherConfig.getContentType(), is(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE));
+ assertThat(publisherConfig.getMaxBatchSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE));
+ assertThat(publisherConfig.getMaxRecoveryQueueSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE));
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java
new file mode 100644
index 0000000..6e79f47
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java
@@ -0,0 +1,64 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.utils;
+
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/24/2017.
+ */
+public class DMaaPSourceConfigMapperTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ @Test
+ public void testMapToSubscriberConfig() throws Exception {
+
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ final DMaaPMRSubscriberConfig subscriberConfig = DMaaPSourceConfigMapper.map(testDMaaPMRSourcePluginConfig);
+
+ assertNotNull(subscriberConfig);
+ assertThat(subscriberConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME));
+ assertThat(subscriberConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME));
+ assertThat(subscriberConfig.getPortNumber(), is(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER));
+ assertThat(subscriberConfig.getProtocol(), is(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL));
+ assertThat(subscriberConfig.getUserName(), is(DMAAP_MR_SOURCE_PLUGIN_USERNAME));
+ assertThat(subscriberConfig.getUserPassword(), is(DMAAP_MR_SOURCE_PLUGIN_PASSWORD));
+ assertThat(subscriberConfig.getContentType(), is(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE));
+ assertThat(subscriberConfig.getConsumerGroup(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP));
+ assertThat(subscriberConfig.getConsumerId(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID));
+ assertThat(subscriberConfig.getMessageLimit(), is(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT));
+ assertThat(subscriberConfig.getTimeoutMS(), is(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMapToSubscriberConfigWhenSubscriberHostNameIsEmpty() throws Exception {
+ final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ testDMaaPMRSourcePluginConfig.setHostName(null);
+ DMaaPSourceConfigMapper.map(testDMaaPMRSourcePluginConfig);
+
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java
new file mode 100644
index 0000000..904a74b
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java
@@ -0,0 +1,86 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
+import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRSinkPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private TestDMaaPMRSinkPluginConfig sinkPluginConfig;
+ private DMaaPMRSinkPluginConfigValidator sinkPluginConfigValidator;
+
+ @Before
+ public void before() {
+ sinkPluginConfigValidator = new DMaaPMRSinkPluginConfigValidator();
+ sinkPluginConfig = getTestDMaaPMRSinkPluginConfig();
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSinkConfig() throws Exception {
+ final GenericValidationResponse<DMaaPMRSinkPluginConfig> validationResponse =
+ sinkPluginConfigValidator.validateAppSettings(sinkPluginConfig);
+ assertFalse(validationResponse.hasErrors());
+ }
+
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSinkConfigWhenHostNameIsNotPresent() throws Exception {
+ sinkPluginConfig.setHostName(null);
+ assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSinkConfigWhenHostPortIsNotPresent() throws Exception {
+ sinkPluginConfig.setPortNumber(null);
+ assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSinkConfigWhenTopicNameIsNotPresent() throws Exception {
+ sinkPluginConfig.setTopicName(null);
+ assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSinkConfigWhenColumnNameIsNotPresent() throws Exception {
+ sinkPluginConfig.setMessageColumnName(null);
+ assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator);
+ }
+
+ private static void assertResponseHasErrors(final TestDMaaPMRSinkPluginConfig sinkPluginConfig,
+ final DMaaPMRSinkPluginConfigValidator validator) {
+ final GenericValidationResponse validationResponse = validator.validateAppSettings(sinkPluginConfig);
+ assertTrue(validationResponse.hasErrors());
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java
new file mode 100644
index 0000000..b4357e8
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java
@@ -0,0 +1,85 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRSourcePluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private TestDMaaPMRSourcePluginConfig sourcePluginConfig;
+ private DMaaPMRSourcePluginConfigValidator sourcePluginConfigValidator;
+
+ @Before
+ public void before() {
+ sourcePluginConfigValidator = new DMaaPMRSourcePluginConfigValidator();
+ sourcePluginConfig = getTestDMaaPMRSourcePluginConfig();
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSourceConfig() throws Exception {
+ final GenericValidationResponse<DMaaPMRSourcePluginConfig> validationResponse =
+ sourcePluginConfigValidator.validateAppSettings(sourcePluginConfig);
+ assertFalse(validationResponse.hasErrors());
+ }
+
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSourceConfigWhenHostNameIsNotPresent() throws Exception {
+ sourcePluginConfig.setHostName(null);
+ assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSourceConfigWhenHostPortIsNotPresent() throws Exception {
+ sourcePluginConfig.setPortNumber(null);
+ assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSourceConfigWhenTopicNameIsNotPresent() throws Exception {
+ sourcePluginConfig.setTopicName(null);
+ assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator);
+ }
+
+ @Test
+ public void validateAppSettingsWithValidDMaaPSourcePollingIntervalIsNotPresent() throws Exception {
+ sourcePluginConfig.setPollingInterval(null);
+ assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator);
+ }
+
+ private static void assertResponseHasErrors(final TestDMaaPMRSourcePluginConfig sourcePluginConfig,
+ final DMaaPMRSourcePluginConfigValidator validator) {
+ final GenericValidationResponse validationResponse = validator.validateAppSettings(sourcePluginConfig);
+ assertTrue(validationResponse.hasErrors());
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java
new file mode 100644
index 0000000..4eff03c
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java
@@ -0,0 +1,107 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;
+import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 3/3/2017.
+ */
+public class JsonPathFilterPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private TestJsonPathFilterPluginConfig jsonPathFilterPluginConfig;
+ private JsonPathFilterPluginConfigValidator jsonPathFilterPluginConfigValidator;
+
+ @Before
+ public void before() {
+ jsonPathFilterPluginConfig = getJsonPathFilterPluginConfig();
+ jsonPathFilterPluginConfigValidator = new JsonPathFilterPluginConfigValidator();
+ }
+
+
+ @Test
+ public void testValidateAppSettingsWhenNoValidationErrors() throws Exception {
+ final GenericValidationResponse<JsonPathFilterPluginConfig> validationResponse =
+ jsonPathFilterPluginConfigValidator.validateAppSettings(jsonPathFilterPluginConfig);
+ assertFalse(validationResponse.hasErrors());
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenFilterMappingsAreEmpty() throws Exception {
+ jsonPathFilterPluginConfig.setJsonFilterMappings("");
+ assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenOutputSchemaIsNotPresent() throws Exception {
+ jsonPathFilterPluginConfig.setSchema(null);
+ assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenOutputSchemaFilterMatchedFieldIsNotBoolean() throws Exception {
+ final String outputSchemaWithMatchedFieldNotBoolean =
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\",\"fields\":" +
+ "[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"filterMatched\",\"type\":[\"string\",\"null\"]}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}" +
+ "]" +
+ "}";
+ jsonPathFilterPluginConfig.setSchema(outputSchemaWithMatchedFieldNotBoolean);
+ assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenOutputSchemaFilterMatchedFieldIsNotNullable() throws Exception {
+ final String outputSchemaWithMatchedFieldNotNullable =
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\",\"fields\":" +
+ "[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"filterMatched\",\"type\":\"boolean\"}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}" +
+ "]" +
+ "}";
+ jsonPathFilterPluginConfig.setSchema(outputSchemaWithMatchedFieldNotNullable);
+ assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator);
+ }
+
+ private static void assertResponseHasErrors(final TestJsonPathFilterPluginConfig jsonPluginConfig,
+ final JsonPathFilterPluginConfigValidator validator) {
+ final GenericValidationResponse validationResponse = validator.validateAppSettings(jsonPluginConfig);
+ assertTrue(validationResponse.hasErrors());
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java
new file mode 100644
index 0000000..284bec3
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java
@@ -0,0 +1,157 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
+import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/21/2017.
+ */
+public class SimpleTCAPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest {
+
+ private TestSimpleTCAPluginConfig testSimpleTCAPluginConfig;
+ private SimpleTCAPluginConfigValidator simpleTCAPluginConfigValidator;
+
+ @Before
+ public void before() {
+ testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig();
+ simpleTCAPluginConfigValidator = new SimpleTCAPluginConfigValidator();
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAllSettingsAreValid() throws Exception {
+ final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse =
+ simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig);
+ assertFalse(validationResponse.hasErrors());
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenVESMessageFieldNameIsMissing() throws Exception {
+ testSimpleTCAPluginConfig.setVesMessageFieldName(null);
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenPolicyJsonIsMissing() throws Exception {
+ testSimpleTCAPluginConfig.setPolicyJson(null);
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAlertFieldNameIsMissing() throws Exception {
+ testSimpleTCAPluginConfig.setAlertFieldName(null);
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenOutputSchemaIsNull() throws Exception {
+ testSimpleTCAPluginConfig.setSchema(null);
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenMessageTypeFieldNameIsMissing() throws Exception {
+ testSimpleTCAPluginConfig.setMessageTypeFieldName(null);
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAlertFieldIsNullableInOutputSchema() throws Exception {
+ testSimpleTCAPluginConfig.setSchema(
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\"," +
+ "\"fields\":[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}," +
+ "{\"name\":\"alert\",\"type\":[\"string\",\"null\"]}," +
+ "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}");
+ final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse =
+ simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig);
+ assertFalse(validationResponse.hasErrors());
+
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAlertFieldIsNotPresentInOutputSchema() throws Exception {
+ testSimpleTCAPluginConfig.setSchema(
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\"," +
+ "\"fields\":[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}," +
+ "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}");
+ final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse =
+ simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig);
+ assertFalse(validationResponse.hasErrors());
+
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAlertFieldIsNullableButNotStringTypeInOutputSchema() throws Exception {
+ testSimpleTCAPluginConfig.setSchema(
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\"," +
+ "\"fields\":[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}," +
+ "{\"name\":\"alert\",\"type\":[\"int\",\"null\"]}," +
+ "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}");
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+ @Test
+ public void testValidateAppSettingsWhenAlertFieldNameIsNotNullableInOutputSchema() throws Exception {
+ testSimpleTCAPluginConfig.setSchema(
+ "{\"type\":\"record\"," +
+ "\"name\":\"etlSchemaBody\"," +
+ "\"fields\":[" +
+ "{\"name\":\"ts\",\"type\":\"long\"}," +
+ "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+ "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+ "{\"name\":\"message\",\"type\":\"string\"}," +
+ "{\"name\":\"alert\",\"type\":\"string\"}," +
+ "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}");
+ assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator);
+ }
+
+
+
+ private static void assertResponseHasErrors(final TestSimpleTCAPluginConfig pluginConfig,
+ final SimpleTCAPluginConfigValidator validator) {
+ final GenericValidationResponse validationResponse = validator.validateAppSettings(pluginConfig);
+ assertTrue(validationResponse.hasErrors());
+ LOG.debug("Validation Error Message: {}", validationResponse.getAllErrorMessage());
+ }
+}