diff options
Diffstat (limited to 'dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae')
27 files changed, 2578 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java new file mode 100644 index 0000000..e9b4e6f --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/BaseAnalyticsCDAPPluginsUnitTest.java @@ -0,0 +1,238 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins; + +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.StageMetrics; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import org.apache.hadoop.conf.Configuration; +import org.onap.dcae.apod.analytics.cdap.common.CDAPPluginConstants; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig; +import org.onap.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier; +import org.onap.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest; + +import java.io.IOException; +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * @author Rajiv Singla . Creation Date: 1/23/2017. + */ +public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest { + + protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER = + Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get(); + + protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json"; + protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json"; + protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION = + "data/json/cef/cef_message_with_threshold_violation.json"; + + + protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com"; + protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905; + protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub"; + protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000; + protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1"; + protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1"; + protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100; + protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000; + + + protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK"; + protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com"; + protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905; + protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub"; + protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https"; + protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username"; + protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password"; + protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json"; + protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message"; + protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10; + protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100; + + protected static final String VES_MESSAGE_FIELD_NAME = "message"; + protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert"; + protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType"; + + + protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter"; + protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message"; + protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched"; + protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS = + "$.event.commonEventHeader.domain:measurementsForVfScaling," + + "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall"; + protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA = + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\",\"fields\":" + + "[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}" + + "]" + + "}"; + + protected static class MockStageMetrics implements StageMetrics, Serializable { + + @Override + public void count(String metricName, int delta) { + LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta); + } + + @Override + public void gauge(String metricName, long value) { + LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value); + } + + @Override + public void pipelineCount(String metricName, int delta) { + LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta); + } + + @Override + public void pipelineGauge(String metricName, long value) { + LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value); + } + } + + protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() { + final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig(); + sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME); + sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME); + sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER); + sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME); + sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL); + sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL); + sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME); + sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD); + sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE); + sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP); + sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID); + sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT); + sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT); + return sourcePluginConfig; + } + + protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() { + final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig(); + sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME); + sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME); + sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER); + sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME); + sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL); + sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME); + sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD); + sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE); + sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME); + sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE); + sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE); + return sinkPluginConfig; + } + + + protected static Configuration getTestConfiguration() { + final Configuration configuration = new Configuration(); + final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap(); + for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) { + configuration.set(property.getKey(), property.getValue()); + } + return configuration; + } + + protected static Map<String, String> createSinkConfigurationMap() { + + Map<String, String> sinkConfig = new LinkedHashMap<>(); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, + DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString()); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, + DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, + DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString()); + sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, + DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString()); + return sinkConfig; + } + + protected static Schema getDMaaPMRSinkTestSchema() { + return Schema.recordOf( + "DMaaPMRSinkTestSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)), + Schema.Field.of("field1", Schema.of(Schema.Type.STRING)) + ); + } + + + protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() { + final String policyJson; + try { + policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION); + } catch (IOException e) { + throw new RuntimeException("Error while parsing policy", e); + } + return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME, + TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false); + } + + protected static Schema getSimpleTCAPluginInputSchema() { + return Schema.recordOf( + "TestSimpleTCAPluginInputSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)), + Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING))) + ); + } + + protected static Schema getJsonFilterPluginInputSchema() { + return Schema.recordOf( + "TestJsonFilterInputSchema", + Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), + Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)), + Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)), + Schema.Field.of("message", Schema.of(Schema.Type.STRING)) + ); + } + + protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() { + return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME, + JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME, + JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME, + JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS, + JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java new file mode 100644 index 0000000..79c7698 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProviderTest.java @@ -0,0 +1,77 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMROutputFormatProviderTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testDMaaPMROutputFormatProviderWhenConfigIsMissingNonRequiredValues() throws Exception { + final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig(); + sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME); + sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME); + final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider = + new DMaaPMROutputFormatProvider(sinkPluginConfig); + final Map<String, String> outputFormatConfiguration = + dMaaPMROutputFormatProvider.getOutputFormatConfiguration(); + final String hostName = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME); + assertTrue(hostName.equals(DMAAP_MR_SINK_PLUGIN_HOST_NAME)); + final String topicName = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME); + assertTrue(topicName.equals(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME)); + final String portNumber = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER); + assertTrue(portNumber.equals(AnalyticsConstants.DEFAULT_PORT_NUMBER.toString())); + final String protocol = outputFormatConfiguration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL); + assertTrue(protocol.equals(AnalyticsConstants.DEFAULT_PROTOCOL)); + } + + @Test + public void testGetOutputFormatClassName() throws Exception { + final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider = + new DMaaPMROutputFormatProvider(getTestDMaaPMRSinkPluginConfig()); + final String outputFormatClassName = dMaaPMROutputFormatProvider.getOutputFormatClassName(); + assertTrue(outputFormatClassName.equals(DMaaPMROutputFormat.class.getName())); + } + + @Test + public void testGetOutputFormatConfiguration() throws Exception { + final TestDMaaPMRSinkPluginConfig testDMaaPMRSinkPluginConfig = getTestDMaaPMRSinkPluginConfig(); + final DMaaPMROutputFormatProvider dMaaPMROutputFormatProvider = + new DMaaPMROutputFormatProvider(testDMaaPMRSinkPluginConfig); + final Map<String, String> outputFormatConfiguration = + dMaaPMROutputFormatProvider.getOutputFormatConfiguration(); + assertTrue(outputFormatConfiguration.size() == 9); + + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java new file mode 100644 index 0000000..4b111a3 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatTest.java @@ -0,0 +1,75 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMROutputFormatTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private DMaaPMROutputFormat dMaaPMROutputFormat; + + @Before + public void before() { + dMaaPMROutputFormat = new DMaaPMROutputFormat(); + } + + @Test + public void testGetRecordWriter() throws Exception { + final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(getTestConfiguration()); + final RecordWriter<String, NullWritable> recordWriter = dMaaPMROutputFormat.getRecordWriter(taskAttemptContext); + assertNotNull(recordWriter); + final JobContext jobContext = Mockito.mock(JobContext.class); + dMaaPMROutputFormat.checkOutputSpecs(jobContext); + } + + @Test + public void testGetOutputCommitter() throws Exception { + final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class); + final OutputCommitter outputCommitter = dMaaPMROutputFormat.getOutputCommitter(taskAttemptContext); + assertTrue(outputCommitter.getClass().equals(DMaaPMROutputFormat.NoOpOutputCommitter.class)); + final JobContext jobContext = Mockito.mock(JobContext.class); + outputCommitter.setupJob(jobContext); + outputCommitter.setupTask(taskAttemptContext); + assertFalse(outputCommitter.needsTaskCommit(taskAttemptContext)); + outputCommitter.commitJob(jobContext); + outputCommitter.commitTask(taskAttemptContext); + outputCommitter.abortTask(taskAttemptContext); + + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java new file mode 100644 index 0000000..3d79057 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriterTest.java @@ -0,0 +1,62 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; + +import java.util.Arrays; + +import static org.mockito.Mockito.times; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRRecordWriterTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private DMaaPMRPublisher publisher; + private DMaaPMRRecordWriter dMaaPMRRecordWriter; + + @Before + public void before() { + publisher = Mockito.mock(DMaaPMRPublisher.class); + dMaaPMRRecordWriter = new DMaaPMRRecordWriter(publisher); + } + + @Test + public void testWrite() throws Exception { + final String testMessage = "test Message"; + dMaaPMRRecordWriter.write(testMessage, null); + Mockito.verify(publisher, times(1)).publish(Arrays.asList(testMessage)); + } + + @Test + public void testClose() throws Exception { + final TaskAttemptContext taskAttemptContext = Mockito.mock(TaskAttemptContext.class); + dMaaPMRRecordWriter.close(taskAttemptContext); + Mockito.verify(publisher, times(1)).flush(); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java new file mode 100644 index 0000000..d649e6e --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSinkTest.java @@ -0,0 +1,95 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.Emitter; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.StageConfigurer; +import co.cask.cdap.etl.api.batch.BatchSinkContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRSinkTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private DMaaPMRSink dMaaPMRSink; + + @Before + public void before() { + dMaaPMRSink = new DMaaPMRSink(getTestDMaaPMRSinkPluginConfig()); + } + + @Test + public void testConfigurePipeline() throws Exception { + final PipelineConfigurer pipelineConfigurer = Mockito.mock(PipelineConfigurer.class); + final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class); + when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer); + when(stageConfigurer.getInputSchema()).thenReturn(getDMaaPMRSinkTestSchema()); + dMaaPMRSink.configurePipeline(pipelineConfigurer); + verify(stageConfigurer, times(1)).getInputSchema(); + } + + @Test(expected = CDAPSettingsException.class) + public void testConfigurePipelineWithInvalidSchema() throws Exception { + final PipelineConfigurer pipelineConfigurer = Mockito.mock(PipelineConfigurer.class); + final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class); + when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer); + when(stageConfigurer.getInputSchema()).thenReturn(Schema.recordOf( + "DMaaPMRSinkInvalidSchema", + Schema.Field.of("message1", Schema.of(Schema.Type.STRING)), + Schema.Field.of("field1", Schema.of(Schema.Type.STRING)) + )); + dMaaPMRSink.configurePipeline(pipelineConfigurer); + } + + @Test + public void testPrepareRun() throws Exception { + final BatchSinkContext batchSinkContext = Mockito.mock(BatchSinkContext.class); + dMaaPMRSink.prepareRun(batchSinkContext); + } + + @Test + @SuppressWarnings("unchecked") + public void testTransform() throws Exception { + final StructuredRecord structuredRecord = Mockito.mock(StructuredRecord.class); + final Emitter emitter = Mockito.mock(Emitter.class); + final String incomingTestMessage = "test message"; + when(structuredRecord.get( + eq(getTestDMaaPMRSinkPluginConfig().getMessageColumnName()))).thenReturn(incomingTestMessage); + doNothing().when(emitter).emit(any()); + dMaaPMRSink.transform(structuredRecord, emitter); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java new file mode 100644 index 0000000..5c80baa --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfigTest.java @@ -0,0 +1,80 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 1/23/2017. + */ +public class DMaaPMRSinkPluginConfigTest extends BaseAnalyticsCDAPPluginsUnitTest { + + @Test + public void testDMaaPMRSinkPluginConfigDefaults() throws Exception { + final DMaaPMRSinkPluginConfig sinkPluginConfig = new DMaaPMRSinkPluginConfig + (DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME, + DMAAP_MR_SINK_PLUGIN_TOPIC_NAME, DMAAP_MR_SINK_MESSAGE_COLUMN_NAME); + + assertThat(sinkPluginConfig.getReferenceName(), is(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME)); + assertThat(sinkPluginConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME)); + assertThat(sinkPluginConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME)); + assertThat(sinkPluginConfig.getMessageColumnName(), is(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME)); + assertNull(sinkPluginConfig.getPortNumber()); + assertNull(sinkPluginConfig.getProtocol()); + assertNull(sinkPluginConfig.getUserName()); + assertNull(sinkPluginConfig.getUserPassword()); + assertNull(sinkPluginConfig.getContentType()); + assertNull(sinkPluginConfig.getMaxBatchSize()); + assertNull(sinkPluginConfig.getMaxRecoveryQueueSize()); + } + + @Test + public void testDMaaPMRSinkPluginConfigCustom() throws Exception { + final DMaaPMRSinkPluginConfig sinkPluginConfig = getTestDMaaPMRSinkPluginConfig(); + assertThat(sinkPluginConfig.getReferenceName(), is(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME)); + assertThat(sinkPluginConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME)); + assertThat(sinkPluginConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME)); + assertThat(sinkPluginConfig.getPortNumber(), is(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER)); + assertThat(sinkPluginConfig.getProtocol(), is(DMAAP_MR_SINK_PLUGIN_PROTOCOL)); + assertThat(sinkPluginConfig.getUserName(), is(DMAAP_MR_SINK_PLUGIN_USERNAME)); + assertThat(sinkPluginConfig.getUserPassword(), is(DMAAP_MR_SINK_PLUGIN_PASSWORD)); + assertThat(sinkPluginConfig.getContentType(), is(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE)); + assertThat(sinkPluginConfig.getMessageColumnName(), is(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME)); + assertThat(sinkPluginConfig.getMaxBatchSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE)); + assertThat(sinkPluginConfig.getMaxRecoveryQueueSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE)); + } + + @Test + public void testValidToString() throws Exception { + final TestDMaaPMRSinkPluginConfig sinkPluginConfig = getTestDMaaPMRSinkPluginConfig(); + assertNotNull(sinkPluginConfig.toString()); + assertTrue(sinkPluginConfig.toString().contains(DMAAP_MR_SINK_PLUGIN_HOST_NAME)); + } + + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java new file mode 100644 index 0000000..bb4f7f6 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfigTest.java @@ -0,0 +1,84 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * @author Rajiv Singla . Creation Date: 1/23/2017. + */ +public class DMaaPMRSourcePluginConfigTest extends BaseAnalyticsCDAPPluginsUnitTest { + + @Test + public void testDMaaPMRSourcePluginConfigDefaults() throws Exception { + final DMaaPMRSourcePluginConfig sourcePluginConfig = new DMaaPMRSourcePluginConfig + (DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME, DMAAP_MR_SOURCE_PLUGIN_HOST_NAME, + DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME, DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL); + + assertThat(sourcePluginConfig.getReferenceName(), is(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME)); + assertThat(sourcePluginConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME)); + assertThat(sourcePluginConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME)); + assertThat(sourcePluginConfig.getPollingInterval(), is(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL)); + assertNull(sourcePluginConfig.getPortNumber()); + assertNull(sourcePluginConfig.getProtocol()); + assertNull(sourcePluginConfig.getUserName()); + assertNull(sourcePluginConfig.getUserPassword()); + assertNull(sourcePluginConfig.getContentType()); + assertNull(sourcePluginConfig.getConsumerGroup()); + assertNull(sourcePluginConfig.getConsumerId()); + assertNull(sourcePluginConfig.getMessageLimit()); + assertNull(sourcePluginConfig.getTimeoutMS()); + } + + @Test + public void testDMaaPMRSourcePluginConfigCustom() throws Exception { + final TestDMaaPMRSourcePluginConfig sourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + assertThat(sourcePluginConfig.getReferenceName(), is(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME)); + assertThat(sourcePluginConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME)); + assertThat(sourcePluginConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME)); + assertThat(sourcePluginConfig.getPollingInterval(), is(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL)); + assertThat(sourcePluginConfig.getPortNumber(), is(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER)); + assertThat(sourcePluginConfig.getProtocol(), is(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL)); + assertThat(sourcePluginConfig.getUserName(), is(DMAAP_MR_SOURCE_PLUGIN_USERNAME)); + assertThat(sourcePluginConfig.getUserPassword(), is(DMAAP_MR_SOURCE_PLUGIN_PASSWORD)); + assertThat(sourcePluginConfig.getContentType(), is(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE)); + assertThat(sourcePluginConfig.getConsumerGroup(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP)); + assertThat(sourcePluginConfig.getConsumerId(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID)); + assertThat(sourcePluginConfig.getMessageLimit(), is(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT)); + assertThat(sourcePluginConfig.getTimeoutMS(), is(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT)); + } + + @Test + public void testValidToString() throws Exception { + final TestDMaaPMRSourcePluginConfig sourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + assertNotNull(sourcePluginConfig.toString()); + assertTrue(sourcePluginConfig.toString().contains(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME)); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java new file mode 100644 index 0000000..8cba5e1 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSinkPluginConfig.java @@ -0,0 +1,76 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import javax.annotation.Nullable; + +/** + * Test {@link DMaaPMRSinkPluginConfig} for testing purposes only + * <p> + * @author Rajiv Singla . Creation Date: 1/23/2017. + */ +public class TestDMaaPMRSinkPluginConfig extends DMaaPMRSinkPluginConfig { + + public void setReferenceName(String referenceName) { + this.referenceName = referenceName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public void setPortNumber(@Nullable Integer portNumber) { + this.portNumber = portNumber; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setProtocol(@Nullable String protocol) { + this.protocol = protocol; + } + + public void setUserName(@Nullable String userName) { + this.userName = userName; + } + + public void setUserPassword(@Nullable String userPassword) { + this.userPassword = userPassword; + } + + public void setContentType(@Nullable String contentType) { + this.contentType = contentType; + } + + public void setMaxBatchSize(@Nullable Integer maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + public void setMaxRecoveryQueueSize(@Nullable Integer maxRecoveryQueueSize) { + this.maxRecoveryQueueSize = maxRecoveryQueueSize; + } + + public void setMessageColumnName(String messageColumnName) { + this.messageColumnName = messageColumnName; + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java new file mode 100644 index 0000000..c33f313 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/TestDMaaPMRSourcePluginConfig.java @@ -0,0 +1,84 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import javax.annotation.Nullable; + +/** + * Test {@link DMaaPMRSourcePluginConfig} for testing purposes only + * <p> + * @author Rajiv Singla . Creation Date: 1/23/2017. + */ +public class TestDMaaPMRSourcePluginConfig extends DMaaPMRSourcePluginConfig { + + public void setReferenceName(String referenceName) { + this.referenceName = referenceName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public void setPortNumber(@Nullable Integer portNumber) { + this.portNumber = portNumber; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setPollingInterval(Integer pollingInterval) { + this.pollingInterval = pollingInterval; + } + + public void setProtocol(@Nullable String protocol) { + this.protocol = protocol; + } + + public void setUserName(@Nullable String userName) { + this.userName = userName; + } + + public void setUserPassword(@Nullable String userPassword) { + this.userPassword = userPassword; + } + + public void setContentType(@Nullable String contentType) { + this.contentType = contentType; + } + + public void setConsumerId(@Nullable String consumerId) { + this.consumerId = consumerId; + } + + public void setConsumerGroup(@Nullable String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public void setTimeoutMS(@Nullable Integer timeoutMS) { + this.timeoutMS = timeoutMS; + } + + public void setMessageLimit(@Nullable Integer messageLimit) { + this.messageLimit = messageLimit; + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java new file mode 100644 index 0000000..efdd7ae --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/filter/TestJsonPathFilterPluginConfig.java @@ -0,0 +1,50 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter; + +/** + * @author Rajiv Singla . Creation Date: 3/3/2017. + */ +public class TestJsonPathFilterPluginConfig extends JsonPathFilterPluginConfig { + + public TestJsonPathFilterPluginConfig(final String referenceName, final String incomingJsonFieldName, + final String outputSchemaFieldName, final String jsonFilterMappings, + final String schema) { + super(referenceName, incomingJsonFieldName, outputSchemaFieldName, jsonFilterMappings, schema); + } + + + public void setIncomingJsonFieldName(String incomingJsonFieldName) { + this.incomingJsonFieldName = incomingJsonFieldName; + } + + public void setOutputSchemaFieldName(String outputSchemaFieldName) { + this.outputSchemaFieldName = outputSchemaFieldName; + } + + public void setJsonFilterMappings(String jsonFilterMappings) { + this.jsonFilterMappings = jsonFilterMappings; + } + + public void setSchema(String schema) { + this.schema = schema; + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java new file mode 100644 index 0000000..6bf3252 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/config/tca/TestSimpleTCAPluginConfig.java @@ -0,0 +1,56 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca; + +/** + * @author Rajiv Singla . Creation Date: 2/17/2017. + */ +public class TestSimpleTCAPluginConfig extends SimpleTCAPluginConfig { + + public TestSimpleTCAPluginConfig(String vesMessageFieldName, String policyJson, String alertFieldName, + String messageTypeFieldName, String schema, Boolean enableAlertCEFFormat) { + super(vesMessageFieldName, policyJson, alertFieldName, messageTypeFieldName, schema, enableAlertCEFFormat); + } + + public void setVesMessageFieldName(String vesMessageFieldName) { + this.vesMessageFieldName = vesMessageFieldName; + } + + public void setPolicyJson(String policyJson) { + this.policyJson = policyJson; + } + + public void setAlertFieldName(String alertFieldName) { + this.alertFieldName = alertFieldName; + } + + public void setMessageTypeFieldName(String messageTypeFieldName) { + this.messageTypeFieldName = messageTypeFieldName; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public void setEnableAlertCEFFormat(Boolean enableAlertCEFFormat) { + this.enableAlertCEFFormat = enableAlertCEFFormat; + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java new file mode 100644 index 0000000..d14e184 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchemaTest.java @@ -0,0 +1,63 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap; + +import co.cask.cdap.api.data.schema.Schema; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 1/25/2017. + */ +public class DMaaPSourceOutputSchemaTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testGetSchemaColumnName() throws Exception { + assertThat(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), is("ts")); + assertThat(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), is("responseCode")); + assertThat(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), is("responseMessage")); + assertThat(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), is("message")); + } + + @Test + public void testGetSchema() throws Exception { + final Schema schema = DMaaPSourceOutputSchema.getSchema(); + final List<Schema.Field> fields = schema.getFields(); + final List<String> fieldNames = new LinkedList<>(); + for (Schema.Field field : fields) { + fieldNames.add(field.getName()); + } + assertThat(fieldNames, hasItems( + DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), + DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), + DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), + DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName())); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java new file mode 100644 index 0000000..762e65a --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/it/SimpleTCAPluginCDAPIT.java @@ -0,0 +1,229 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.it; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.table.Table; +import co.cask.cdap.api.plugin.PluginClass; +import co.cask.cdap.api.plugin.PluginPropertyField; +import co.cask.cdap.common.utils.Tasks; +import co.cask.cdap.datapipeline.DataPipelineApp; +import co.cask.cdap.datapipeline.SmartWorkflow; +import co.cask.cdap.etl.api.batch.SparkCompute; +import co.cask.cdap.etl.mock.batch.MockSink; +import co.cask.cdap.etl.mock.batch.MockSource; +import co.cask.cdap.etl.mock.test.HydratorTestBase; +import co.cask.cdap.etl.proto.v2.ETLBatchConfig; +import co.cask.cdap.etl.proto.v2.ETLPlugin; +import co.cask.cdap.etl.proto.v2.ETLStage; +import co.cask.cdap.proto.artifact.AppRequest; +import co.cask.cdap.proto.artifact.ArtifactSummary; +import co.cask.cdap.proto.id.ApplicationId; +import co.cask.cdap.proto.id.ArtifactId; +import co.cask.cdap.proto.id.NamespaceId; +import co.cask.cdap.test.ApplicationManager; +import co.cask.cdap.test.DataSetManager; +import co.cask.cdap.test.WorkflowManager; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.onap.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin; +import org.onap.dcae.apod.analytics.common.validation.DCAEValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin + * + * @author Rajiv Singla . Creation Date: 2/17/2017. + */ +public class SimpleTCAPluginCDAPIT extends HydratorTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class); + + private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT"; + private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins"; + + protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", + "4.0.0"); + protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0"); + + private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)) + ); + + final Schema outputSchema = Schema.recordOf( + "outputSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)), + Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING)) + ); + + @BeforeClass + public static void setupTest() throws Exception { + + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + + // Enable the below code if you want to run the test in Intelli IDEA editor + // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID, + // SimpleTCAPlugin.class, SimpleTCAPluginConfig.class); + + // Enable the below code if you want to run the test via command line + ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact( + CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION); + + addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID, + ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class, + CDAPAppSettingsValidator.class, DCAEValidator.class); + } + + private static PluginClass getSimpleTCAPluginClass() { + final HashMap<String, PluginPropertyField> properties = new HashMap<>(); + properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "", + "string", false, false)); + properties.put("referenceName", new PluginPropertyField("referenceName", "", + "string", false, false)); + properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false)); + properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false)); + properties.put("messageTypeFieldName", new PluginPropertyField( + "messageTypeFieldName", "", "string", false, false)); + properties.put("enableAlertCEFFormat", new PluginPropertyField( + "enableAlertCEFFormat", "", "string", false, false)); + properties.put("schema", new PluginPropertyField( + "schema", "", "string", false, false)); + + return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(), + "pluginConfig", properties); + } + + + @AfterClass + public static void cleanup() { + } + + @Test + @SuppressWarnings("deprecation") + public void testTransform() throws Exception { + + LOG.info("Starting Test Transform"); + + final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json"); + final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json"); + + final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>() + .put("vesMessageFieldName", "message") + .put("referenceName", "SimpleTcaPlugin") + .put("policyJson", policyString) + .put("alertFieldName", "alert") + .put("messageTypeFieldName", "tcaMessageType") + .put("enableAlertCEFFormat", "true") + .put("schema", outputSchema.toString()) + .build(); + + final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema); + final ETLPlugin tcaPlugin = + new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null); + final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput"); + + final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *") + .addStage(new ETLStage("source", mockSourcePlugin)) + .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin)) + .addStage(new ETLStage("sink", mockSink)) + .addConnection("source", "simpleTCAPlugin") + .addConnection("simpleTCAPlugin", "sink") + .build(); + + AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig); + ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin"); + ApplicationManager appManager = deployApplication(appId.toId(), appRequest); + + List<StructuredRecord> sourceMessages = new ArrayList<>(); + StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema); + builder.set("message", cefMessage); + sourceMessages.add(builder.build()); + + // write records to source + DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages")); + MockSource.writeInput(inputManager, sourceMessages); + + // manually trigger the pipeline + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + workflowManager.start(); + workflowManager.waitForFinish(5, TimeUnit.MINUTES); + + final DataSetManager<Table> outputManager = getDataset("tcaOutput"); + + Tasks.waitFor( + TCACalculatorMessageType.COMPLIANT.name(), + new Callable<String>() { + @Override + public String call() throws Exception { + outputManager.flush(); + List<String> tcaOutputMessageType = new LinkedList<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString()); + final List<Schema.Field> fields = outputRecord.getSchema().getFields(); + LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord); + for (Schema.Field field : fields) { + LOG.debug("Field Name: {} - Field Type: {} ---> Field Value: {}", + field.getName(), field.getSchema().getType(), + outputRecord.get(field.getName())); + } + + } + return tcaOutputMessageType.get(0); + } + }, + 4, + TimeUnit.MINUTES); + + } + + private static String getFileContentAsString(final String fileLocation) throws Exception { + final URI tcaPolicyURI = + SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI(); + List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset()); + return Joiner.on("").join(lines); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java new file mode 100644 index 0000000..f7e379b --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPluginTest.java @@ -0,0 +1,119 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.sparkcompute.tca; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.StageConfigurer; +import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 2/17/2017. + */ +public class SimpleTCAPluginTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private SimpleTCAPlugin simpleTCAPlugin; + + @Before + public void before() { + final TestSimpleTCAPluginConfig testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig(); + Schema outputSchema = Schema.recordOf( + "TestSimpleTCAPluginInputSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)), + Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING)) + ); + testSimpleTCAPluginConfig.setSchema(outputSchema.toString()); + simpleTCAPlugin = new SimpleTCAPlugin(testSimpleTCAPluginConfig); + } + + @Test + public void testConfigurePipeline() throws Exception { + final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class); + final StageConfigurer stageConfigurer = mock(StageConfigurer.class); + when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer); + when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema()); + simpleTCAPlugin.configurePipeline(pipelineConfigurer); + verify(stageConfigurer, times(1)).getInputSchema(); + } + + @Test + public void testTransform() throws Exception { + + JavaSparkContext javaSparkContext = new JavaSparkContext("local", "test"); + + Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema", + Schema.Field.of("message", Schema.of(Schema.Type.STRING)) + ); + + // Inapplicable Message Structured Record + final StructuredRecord inapplicableSR = + StructuredRecord.builder(sourceSchema).set("message", "test").build(); + // compliant + final StructuredRecord compliantSR = + StructuredRecord.builder(sourceSchema).set("message", + fromStream(CEF_MESSAGE_JSON_FILE_LOCATION)).build(); + // non compliant + final String nonCompliantCEF = fromStream(CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION); + final StructuredRecord nonCompliantSR = + StructuredRecord.builder(sourceSchema).set("message", nonCompliantCEF).build(); + + final List<StructuredRecord> records = new LinkedList<>(); + records.add(inapplicableSR); + records.add(compliantSR); + records.add(nonCompliantSR); + + final JavaRDD<StructuredRecord> input = + javaSparkContext.parallelize(records); + final SparkExecutionPluginContext context = Mockito.mock(SparkExecutionPluginContext.class); + final MockStageMetrics stageMetrics = Mockito.mock(MockStageMetrics.class); + when(context.getMetrics()).thenReturn(stageMetrics); + final List<StructuredRecord> outputRecord = simpleTCAPlugin.transform(context, input).collect(); + assertNotNull(outputRecord); + assertThat(outputRecord.size(), is(3)); + + assertTrue(outputRecord.get(0).get("tcaMessageType").equals(TCACalculatorMessageType.INAPPLICABLE.toString())); + assertTrue(outputRecord.get(1).get("tcaMessageType").equals(TCACalculatorMessageType.COMPLIANT.toString())); + assertTrue(outputRecord.get(2).get("tcaMessageType").equals(TCACalculatorMessageType.NON_COMPLIANT.toString())); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java new file mode 100644 index 0000000..aca5581 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiverTest.java @@ -0,0 +1,75 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import com.google.common.collect.ImmutableList; +import org.apache.spark.storage.StorageLevel; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 1/24/2017. + */ +public class DMaaPMRReceiverTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testStoreStructuredRecords() throws Exception { + + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final TestDMaaPMRReceiver dMaaPMRReceiver = + new TestDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), testDMaaPMRSourcePluginConfig); + + final DMaaPMRSubscriber dMaaPMRSubscriber = Mockito.mock(DMaaPMRSubscriber.class); + final DMaaPMRSubscriberResponse subscriberResponse = Mockito.mock(DMaaPMRSubscriberResponse.class); + when(dMaaPMRSubscriber.fetchMessages()).thenReturn(subscriberResponse); + when(subscriberResponse.getFetchedMessages()).thenReturn(ImmutableList.of("Test Message")); + when(subscriberResponse.getResponseCode()).thenReturn(200); + when(subscriberResponse.getResponseMessage()).thenReturn("OK"); + dMaaPMRReceiver.storeStructuredRecords(dMaaPMRSubscriber); + verify(dMaaPMRSubscriber, times(1)).fetchMessages(); + verify(subscriberResponse, times(1)).getFetchedMessages(); + } + + @Test + public void testStoreStructuredRecordsWhenSubscriberThrowsException() throws Exception { + + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final TestDMaaPMRReceiver dMaaPMRReceiver = + new TestDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), testDMaaPMRSourcePluginConfig); + + final DMaaPMRSubscriber dMaaPMRSubscriber = Mockito.mock(DMaaPMRSubscriber.class); + final DMaaPMRSubscriberResponse subscriberResponse = Mockito.mock(DMaaPMRSubscriberResponse.class); + when(dMaaPMRSubscriber.fetchMessages()).thenThrow(DCAEAnalyticsRuntimeException.class); + dMaaPMRReceiver.storeStructuredRecords(dMaaPMRSubscriber); + verify(dMaaPMRSubscriber, times(1)).fetchMessages(); + verify(subscriberResponse, times(0)).getFetchedMessages(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java new file mode 100644 index 0000000..7b1f876 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSourceTest.java @@ -0,0 +1,91 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.StageConfigurer; +import co.cask.cdap.etl.api.streaming.StreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.receiver.Receiver; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 1/24/2017. + */ +public class DMaaPMRSourceTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private PipelineConfigurer pipelineConfigurer; + + @Before + public void before() { + pipelineConfigurer = Mockito.mock(PipelineConfigurer.class); + final StageConfigurer stageConfigurer = Mockito.mock(StageConfigurer.class); + when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer); + doNothing().when(stageConfigurer).setOutputSchema(any(Schema.class)); + } + + @Test + public void testDMaaPMRSourceConfigurePipelineWithValidPluginSettings() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig); + dMaaPMRSource.configurePipeline(pipelineConfigurer); + assertNotNull(dMaaPMRSource); + } + + @Test(expected = CDAPSettingsException.class) + public void testDMaaPMRSourceConfigurePipelineWithInvalidPluginSettings() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + // blank out DMaaP MR Source Host + testDMaaPMRSourcePluginConfig.setHostName(null); + final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig); + dMaaPMRSource.configurePipeline(pipelineConfigurer); + } + + + @Test + @SuppressWarnings("unchecked") + public void testGetStream() throws Exception { + final StreamingContext streamingContext = Mockito.mock(StreamingContext.class); + final JavaStreamingContext javaStreamingContext = Mockito.mock(JavaStreamingContext.class); + final JavaReceiverInputDStream dMaaPMRReceiver = Mockito.mock(JavaReceiverInputDStream.class); + when(streamingContext.getSparkStreamingContext()).thenReturn(javaStreamingContext); + when(javaStreamingContext.receiverStream(any(Receiver.class))).thenReturn(dMaaPMRReceiver); + + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final DMaaPMRSource dMaaPMRSource = new DMaaPMRSource(testDMaaPMRSourcePluginConfig); + final JavaDStream<StructuredRecord> stream = dMaaPMRSource.getStream(streamingContext); + assertNotNull(stream); + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java new file mode 100644 index 0000000..cd3e4bd --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiverTest.java @@ -0,0 +1,81 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import org.apache.spark.storage.StorageLevel; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema; + +import java.util.concurrent.TimeUnit; + +/** + * @author Rajiv Singla . Creation Date: 2/20/2017. + */ +public class MockDMaaPMRReceiverTest extends BaseAnalyticsCDAPPluginsUnitTest { + + protected class TestMockDMaaPMRReceiverTest extends MockDMaaPMRReceiver { + + private boolean canStop = false; + + public TestMockDMaaPMRReceiverTest(StorageLevel storageLevel, DMaaPMRSourcePluginConfig pluginConfig) { + super(storageLevel, pluginConfig); + } + + @Override + public boolean isStopped() { + return canStop; + } + + @Override + public void store(StructuredRecord dataItem) { + LOG.debug("Mocking storing dataItem - {}", + dataItem.get(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName())); + } + + public void setCanStop(boolean canStop) { + this.canStop = canStop; + } + } + + @Test + public void testStoreStructuredRecords() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + testDMaaPMRSourcePluginConfig.setPollingInterval(100); + final TestMockDMaaPMRReceiverTest mockDMaaPMRReceiver = new TestMockDMaaPMRReceiverTest(StorageLevel + .MEMORY_ONLY(), + testDMaaPMRSourcePluginConfig); + new Thread(new Runnable() { + @Override + public void run() { + mockDMaaPMRReceiver.storeStructuredRecords(null); + } + }).start(); + TimeUnit.MILLISECONDS.sleep(1000); + LOG.info("Killing Mock Subscriber after 1 ms"); + mockDMaaPMRReceiver.setCanStop(true); + + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java new file mode 100644 index 0000000..6a842d0 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSourceTest.java @@ -0,0 +1,74 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.etl.api.streaming.StreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.receiver.Receiver; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 2/20/2017. + */ +@SuppressWarnings("unchecked") +public class MockDMaaPMRSourceTest extends BaseAnalyticsCDAPPluginsUnitTest { + + @Test + public void testGetStream() throws Exception { + final StreamingContext streamingContext = Mockito.mock(StreamingContext.class); + final JavaStreamingContext javaStreamingContext = Mockito.mock(JavaStreamingContext.class); + final JavaReceiverInputDStream dMaaPMRReceiver = Mockito.mock(JavaReceiverInputDStream.class); + when(streamingContext.getSparkStreamingContext()).thenReturn(javaStreamingContext); + when(javaStreamingContext.receiverStream(any(Receiver.class))).thenReturn(dMaaPMRReceiver); + + MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(getTestDMaaPMRSourcePluginConfig()); + final JavaDStream<StructuredRecord> stream = mockDMaaPMRSource.getStream(streamingContext); + assertNotNull(stream); + } + + @Test(expected = CDAPSettingsException.class) + public void testConfigurePipelineWhenPollingIntervalNotPresent() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + testDMaaPMRSourcePluginConfig.setPollingInterval(null); + final MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(testDMaaPMRSourcePluginConfig); + mockDMaaPMRSource.configurePipeline(null); + } + + @Test + public void testConfigurePipelineWhenPollingIntervalIsPresent() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final MockDMaaPMRSource mockDMaaPMRSource = new MockDMaaPMRSource(testDMaaPMRSourcePluginConfig); + mockDMaaPMRSource.configurePipeline(null); + assertNotNull(mockDMaaPMRSource); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java new file mode 100644 index 0000000..0d87496 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/TestDMaaPMRReceiver.java @@ -0,0 +1,58 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.metrics.Metrics; +import org.apache.spark.storage.StorageLevel; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; + +/** + * Test implementation for {@link DMaaPMRReceiver} + * <p> + * @author Rajiv Singla . Creation Date: 1/24/2017. + */ +public class TestDMaaPMRReceiver extends DMaaPMRReceiver { + + protected static Metrics metrics; + + static { + metrics = Mockito.mock(Metrics.class); + doNothing().when(metrics).count(anyString(), anyInt()); + doNothing().when(metrics).gauge(anyString(), anyInt()); + } + + + public TestDMaaPMRReceiver(StorageLevel storageLevel, DMaaPMRSourcePluginConfig pluginConfig) { + + super(storageLevel, pluginConfig, metrics); + } + + @Override + public void store(StructuredRecord dataItem) { + // do nothing + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java new file mode 100644 index 0000000..8bf91c8 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilterTest.java @@ -0,0 +1,84 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.transform.filter; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.Emitter; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.StageConfigurer; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; + +import java.util.Date; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 3/3/2017. + */ +public class JsonPathFilterTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testInitializeWhenFilterMappingIsValid() throws Exception { + final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig()); + jsonPathFilter.initialize(null); + } + + + @Test + public void configurePipeline() throws Exception { + final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig()); + final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class); + final StageConfigurer stageConfigurer = mock(StageConfigurer.class); + when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer); + when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema()); + doNothing().when(stageConfigurer).setOutputSchema(any(Schema.class)); + jsonPathFilter.configurePipeline(pipelineConfigurer); + verify(stageConfigurer, times(1)).setOutputSchema(any(Schema.class)); + } + + @Test + public void testTransform() throws Exception { + final JsonPathFilter jsonPathFilter = new JsonPathFilter(getJsonPathFilterPluginConfig()); + jsonPathFilter.initialize(null); + final StructuredRecord inputSR = StructuredRecord.builder(getJsonFilterPluginInputSchema()) + .set("ts", new Date().getTime()) + .set("responseCode", 200) + .set("responseMessage", "OK") + .set("message", fromStream(CEF_MESSAGE_JSON_FILE_LOCATION)) + .build(); + + final Emitter emitter = Mockito.mock(Emitter.class); + doNothing().when(emitter).emit(ArgumentMatchers.any(StructuredRecord.class)); + jsonPathFilter.transform(inputSR, emitter); + verify(emitter, times(1)).emit(any(StructuredRecord.class)); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java new file mode 100644 index 0000000..2792e17 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtilsTest.java @@ -0,0 +1,171 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.utils; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class CDAPPluginUtilsTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testValidateSchemaContainsFieldsWhenSchemaIsNotNull() throws Exception { + final Schema dMaaPMRSinkTestSchema = getDMaaPMRSinkTestSchema(); + CDAPPluginUtils.validateSchemaContainsFields(dMaaPMRSinkTestSchema, "message"); + } + + @Test + public void testValidateSchemaContainsFieldsWhenInputSchemaIsNull() throws Exception { + CDAPPluginUtils.validateSchemaContainsFields(null, "message"); + } + + @Test + public void testCreateStructuredRecord() throws Exception { + final StructuredRecord testMessage = CDAPPluginUtils.createDMaaPMRResponseStructuredRecord("testMessage"); + assertNotNull(testMessage); + } + + + @Test + public void testCreateOutputStructuredRecordBuilder() throws Exception { + + final String messageFieldName = "message"; + final String firstInputFieldName = "inputField1"; + final String secondInputFieldName = "inputField2"; + + + final Schema inputSchema = Schema.recordOf( + "inputSchema", + Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)), + Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(secondInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))) + ); + + final String addedFieldName = "addedField"; + final Schema outputSchema = Schema.recordOf( + "outputSchema", + Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)), + Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(addedFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))) // added field + // missing second Input Field + ); + + // input structured record + final String messageFieldValue = "Message String"; + final String firstFieldValue = "Input Field 1"; + final String secondFieldValue = "Input Field 2"; + final StructuredRecord inputSR = StructuredRecord.builder(inputSchema) + .set(messageFieldName, messageFieldValue) + .set(firstInputFieldName, firstFieldValue) + .set(secondInputFieldName, secondFieldValue) + .build(); + + final StructuredRecord.Builder outputStructuredRecordBuilder = + CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputSR); + + final String addedFieldValue = "Added Field Value"; + final StructuredRecord outputSR = outputStructuredRecordBuilder + .set(addedFieldName, addedFieldValue) + .build(); + + assertThat("Added Field field value copied correctly", + outputSR.get(addedFieldName).toString(), is(addedFieldValue)); + + assertThat("Output SR has message field copied correctly", + outputSR.get(messageFieldName).toString(), is(messageFieldValue)); + + assertThat("First Field value copied correctly", + outputSR.get(firstInputFieldName).toString(), is(firstFieldValue)); + + assertNull("Second Field value is null as output schema does not have the field", + outputSR.get(secondInputFieldName)); + + } + + + @Test + public void testAddFieldValueToStructuredRecordBuilder() throws Exception { + + final String messageFieldName = "message"; + final String firstInputFieldName = "inputField1"; + final String addedFieldName = "addedField"; + final String firstFieldValue = "Input Field 1"; + final String addedFieldValue = "Added Field Value"; + final Schema outputSchema = Schema.recordOf( + "outputSchema", + Schema.Field.of(messageFieldName, Schema.of(Schema.Type.STRING)), + Schema.Field.of(firstInputFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(addedFieldName, Schema.nullableOf(Schema.of(Schema.Type.STRING))) // added field + ); + + final StructuredRecord.Builder outputSRBuilder = StructuredRecord.builder(outputSchema) + .set(messageFieldName, "Some message") + .set(firstInputFieldName, firstFieldValue); + + final StructuredRecord.Builder addedFieldSRBuilder = CDAPPluginUtils.addFieldValueToStructuredRecordBuilder( + outputSRBuilder, outputSchema, addedFieldName, addedFieldValue); + + // Try adding field to output Structured record that is not in output schema + final String nonExistentFieldName = "fieldNotInOutputSchema"; + final String nonExistentFieldValue = "Some Value"; + final StructuredRecord outputSR = CDAPPluginUtils.addFieldValueToStructuredRecordBuilder( + addedFieldSRBuilder, outputSchema, nonExistentFieldName, nonExistentFieldValue).build(); + + assertThat("Output SR must contain added Field which is in output schema", + outputSR.get(addedFieldName).toString(), is(addedFieldValue)); + assertNull("Output SR must not contain field that is not in output schema", + outputSR.get(nonExistentFieldName)); + + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testValidateSchemaFieldTypeWhenInputSchemaIsNotValidJson() throws Exception { + CDAPPluginUtils.validateSchemaFieldType("Invalid Schema", "field1", Schema.Type.STRING); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testSetOutputSchemaWhenOutputSchemaIsNotValidJson() throws Exception { + CDAPPluginUtils.setOutputSchema(null, "Invalid output Schema"); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testExtractFieldMappingsWhenFieldMappingValueIsEmpty() throws Exception { + CDAPPluginUtils.extractFieldMappings("path1:,path2:value2"); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testExtractFieldMappingsWhenFieldMappingAreBlank() throws Exception { + CDAPPluginUtils.extractFieldMappings("path1: ,path2:value2"); + } + + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java new file mode 100644 index 0000000..9b6f7f8 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapperTest.java @@ -0,0 +1,57 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.utils; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPSinkConfigMapperTest extends BaseAnalyticsCDAPPluginsUnitTest { + + + @Test + public void testMapToPublisherConfig() throws Exception { + + final Configuration testConfiguration = getTestConfiguration(); + final DMaaPMRPublisherConfig publisherConfig = DMaaPSinkConfigMapper.map(testConfiguration); + + assertNotNull(publisherConfig); + assertThat(publisherConfig.getHostName(), is(DMAAP_MR_SINK_PLUGIN_HOST_NAME)); + assertThat(publisherConfig.getTopicName(), is(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME)); + assertThat(publisherConfig.getPortNumber(), is(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER)); + assertThat(publisherConfig.getProtocol(), is(DMAAP_MR_SINK_PLUGIN_PROTOCOL)); + assertThat(publisherConfig.getUserName(), is(DMAAP_MR_SINK_PLUGIN_USERNAME)); + assertThat(publisherConfig.getUserPassword(), is(DMAAP_MR_SINK_PLUGIN_PASSWORD)); + assertThat(publisherConfig.getContentType(), is(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE)); + assertThat(publisherConfig.getMaxBatchSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE)); + assertThat(publisherConfig.getMaxRecoveryQueueSize(), is(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE)); + + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java new file mode 100644 index 0000000..6e79f47 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapperTest.java @@ -0,0 +1,64 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.utils; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 1/24/2017. + */ +public class DMaaPSourceConfigMapperTest extends BaseAnalyticsCDAPPluginsUnitTest { + + @Test + public void testMapToSubscriberConfig() throws Exception { + + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + final DMaaPMRSubscriberConfig subscriberConfig = DMaaPSourceConfigMapper.map(testDMaaPMRSourcePluginConfig); + + assertNotNull(subscriberConfig); + assertThat(subscriberConfig.getHostName(), is(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME)); + assertThat(subscriberConfig.getTopicName(), is(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME)); + assertThat(subscriberConfig.getPortNumber(), is(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER)); + assertThat(subscriberConfig.getProtocol(), is(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL)); + assertThat(subscriberConfig.getUserName(), is(DMAAP_MR_SOURCE_PLUGIN_USERNAME)); + assertThat(subscriberConfig.getUserPassword(), is(DMAAP_MR_SOURCE_PLUGIN_PASSWORD)); + assertThat(subscriberConfig.getContentType(), is(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE)); + assertThat(subscriberConfig.getConsumerGroup(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP)); + assertThat(subscriberConfig.getConsumerId(), is(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID)); + assertThat(subscriberConfig.getMessageLimit(), is(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT)); + assertThat(subscriberConfig.getTimeoutMS(), is(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT)); + } + + @Test(expected = IllegalStateException.class) + public void testMapToSubscriberConfigWhenSubscriberHostNameIsEmpty() throws Exception { + final TestDMaaPMRSourcePluginConfig testDMaaPMRSourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + testDMaaPMRSourcePluginConfig.setHostName(null); + DMaaPSourceConfigMapper.map(testDMaaPMRSourcePluginConfig); + + } +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java new file mode 100644 index 0000000..904a74b --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidatorTest.java @@ -0,0 +1,86 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.validator; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRSinkPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private TestDMaaPMRSinkPluginConfig sinkPluginConfig; + private DMaaPMRSinkPluginConfigValidator sinkPluginConfigValidator; + + @Before + public void before() { + sinkPluginConfigValidator = new DMaaPMRSinkPluginConfigValidator(); + sinkPluginConfig = getTestDMaaPMRSinkPluginConfig(); + } + + @Test + public void validateAppSettingsWithValidDMaaPSinkConfig() throws Exception { + final GenericValidationResponse<DMaaPMRSinkPluginConfig> validationResponse = + sinkPluginConfigValidator.validateAppSettings(sinkPluginConfig); + assertFalse(validationResponse.hasErrors()); + } + + + @Test + public void validateAppSettingsWithValidDMaaPSinkConfigWhenHostNameIsNotPresent() throws Exception { + sinkPluginConfig.setHostName(null); + assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSinkConfigWhenHostPortIsNotPresent() throws Exception { + sinkPluginConfig.setPortNumber(null); + assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSinkConfigWhenTopicNameIsNotPresent() throws Exception { + sinkPluginConfig.setTopicName(null); + assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSinkConfigWhenColumnNameIsNotPresent() throws Exception { + sinkPluginConfig.setMessageColumnName(null); + assertResponseHasErrors(sinkPluginConfig, sinkPluginConfigValidator); + } + + private static void assertResponseHasErrors(final TestDMaaPMRSinkPluginConfig sinkPluginConfig, + final DMaaPMRSinkPluginConfigValidator validator) { + final GenericValidationResponse validationResponse = validator.validateAppSettings(sinkPluginConfig); + assertTrue(validationResponse.hasErrors()); + } + + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java new file mode 100644 index 0000000..b4357e8 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidatorTest.java @@ -0,0 +1,85 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.validator; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRSourcePluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private TestDMaaPMRSourcePluginConfig sourcePluginConfig; + private DMaaPMRSourcePluginConfigValidator sourcePluginConfigValidator; + + @Before + public void before() { + sourcePluginConfigValidator = new DMaaPMRSourcePluginConfigValidator(); + sourcePluginConfig = getTestDMaaPMRSourcePluginConfig(); + } + + @Test + public void validateAppSettingsWithValidDMaaPSourceConfig() throws Exception { + final GenericValidationResponse<DMaaPMRSourcePluginConfig> validationResponse = + sourcePluginConfigValidator.validateAppSettings(sourcePluginConfig); + assertFalse(validationResponse.hasErrors()); + } + + + @Test + public void validateAppSettingsWithValidDMaaPSourceConfigWhenHostNameIsNotPresent() throws Exception { + sourcePluginConfig.setHostName(null); + assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSourceConfigWhenHostPortIsNotPresent() throws Exception { + sourcePluginConfig.setPortNumber(null); + assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSourceConfigWhenTopicNameIsNotPresent() throws Exception { + sourcePluginConfig.setTopicName(null); + assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator); + } + + @Test + public void validateAppSettingsWithValidDMaaPSourcePollingIntervalIsNotPresent() throws Exception { + sourcePluginConfig.setPollingInterval(null); + assertResponseHasErrors(sourcePluginConfig, sourcePluginConfigValidator); + } + + private static void assertResponseHasErrors(final TestDMaaPMRSourcePluginConfig sourcePluginConfig, + final DMaaPMRSourcePluginConfigValidator validator) { + final GenericValidationResponse validationResponse = validator.validateAppSettings(sourcePluginConfig); + assertTrue(validationResponse.hasErrors()); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java new file mode 100644 index 0000000..4eff03c --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidatorTest.java @@ -0,0 +1,107 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.validator; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 3/3/2017. + */ +public class JsonPathFilterPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private TestJsonPathFilterPluginConfig jsonPathFilterPluginConfig; + private JsonPathFilterPluginConfigValidator jsonPathFilterPluginConfigValidator; + + @Before + public void before() { + jsonPathFilterPluginConfig = getJsonPathFilterPluginConfig(); + jsonPathFilterPluginConfigValidator = new JsonPathFilterPluginConfigValidator(); + } + + + @Test + public void testValidateAppSettingsWhenNoValidationErrors() throws Exception { + final GenericValidationResponse<JsonPathFilterPluginConfig> validationResponse = + jsonPathFilterPluginConfigValidator.validateAppSettings(jsonPathFilterPluginConfig); + assertFalse(validationResponse.hasErrors()); + } + + @Test + public void testValidateAppSettingsWhenFilterMappingsAreEmpty() throws Exception { + jsonPathFilterPluginConfig.setJsonFilterMappings(""); + assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenOutputSchemaIsNotPresent() throws Exception { + jsonPathFilterPluginConfig.setSchema(null); + assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenOutputSchemaFilterMatchedFieldIsNotBoolean() throws Exception { + final String outputSchemaWithMatchedFieldNotBoolean = + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\",\"fields\":" + + "[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"filterMatched\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}" + + "]" + + "}"; + jsonPathFilterPluginConfig.setSchema(outputSchemaWithMatchedFieldNotBoolean); + assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenOutputSchemaFilterMatchedFieldIsNotNullable() throws Exception { + final String outputSchemaWithMatchedFieldNotNullable = + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\",\"fields\":" + + "[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"filterMatched\",\"type\":\"boolean\"}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}" + + "]" + + "}"; + jsonPathFilterPluginConfig.setSchema(outputSchemaWithMatchedFieldNotNullable); + assertResponseHasErrors(jsonPathFilterPluginConfig, jsonPathFilterPluginConfigValidator); + } + + private static void assertResponseHasErrors(final TestJsonPathFilterPluginConfig jsonPluginConfig, + final JsonPathFilterPluginConfigValidator validator) { + final GenericValidationResponse validationResponse = validator.validateAppSettings(jsonPluginConfig); + assertTrue(validationResponse.hasErrors()); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java new file mode 100644 index 0000000..284bec3 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/test/java/org/onap/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidatorTest.java @@ -0,0 +1,157 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.plugins.validator; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; +import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Rajiv Singla . Creation Date: 2/21/2017. + */ +public class SimpleTCAPluginConfigValidatorTest extends BaseAnalyticsCDAPPluginsUnitTest { + + private TestSimpleTCAPluginConfig testSimpleTCAPluginConfig; + private SimpleTCAPluginConfigValidator simpleTCAPluginConfigValidator; + + @Before + public void before() { + testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig(); + simpleTCAPluginConfigValidator = new SimpleTCAPluginConfigValidator(); + } + + @Test + public void testValidateAppSettingsWhenAllSettingsAreValid() throws Exception { + final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse = + simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig); + assertFalse(validationResponse.hasErrors()); + } + + @Test + public void testValidateAppSettingsWhenVESMessageFieldNameIsMissing() throws Exception { + testSimpleTCAPluginConfig.setVesMessageFieldName(null); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenPolicyJsonIsMissing() throws Exception { + testSimpleTCAPluginConfig.setPolicyJson(null); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenAlertFieldNameIsMissing() throws Exception { + testSimpleTCAPluginConfig.setAlertFieldName(null); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenOutputSchemaIsNull() throws Exception { + testSimpleTCAPluginConfig.setSchema(null); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenMessageTypeFieldNameIsMissing() throws Exception { + testSimpleTCAPluginConfig.setMessageTypeFieldName(null); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenAlertFieldIsNullableInOutputSchema() throws Exception { + testSimpleTCAPluginConfig.setSchema( + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\"," + + "\"fields\":[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}," + + "{\"name\":\"alert\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}"); + final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse = + simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig); + assertFalse(validationResponse.hasErrors()); + + } + + @Test + public void testValidateAppSettingsWhenAlertFieldIsNotPresentInOutputSchema() throws Exception { + testSimpleTCAPluginConfig.setSchema( + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\"," + + "\"fields\":[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}," + + "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}"); + final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse = + simpleTCAPluginConfigValidator.validateAppSettings(testSimpleTCAPluginConfig); + assertFalse(validationResponse.hasErrors()); + + } + + @Test + public void testValidateAppSettingsWhenAlertFieldIsNullableButNotStringTypeInOutputSchema() throws Exception { + testSimpleTCAPluginConfig.setSchema( + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\"," + + "\"fields\":[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}," + + "{\"name\":\"alert\",\"type\":[\"int\",\"null\"]}," + + "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}"); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + @Test + public void testValidateAppSettingsWhenAlertFieldNameIsNotNullableInOutputSchema() throws Exception { + testSimpleTCAPluginConfig.setSchema( + "{\"type\":\"record\"," + + "\"name\":\"etlSchemaBody\"," + + "\"fields\":[" + + "{\"name\":\"ts\",\"type\":\"long\"}," + + "{\"name\":\"responseCode\",\"type\":\"int\"}," + + "{\"name\":\"responseMessage\",\"type\":\"string\"}," + + "{\"name\":\"message\",\"type\":\"string\"}," + + "{\"name\":\"alert\",\"type\":\"string\"}," + + "{\"name\":\"tcaMessageType\",\"type\":\"string\"}]}"); + assertResponseHasErrors(testSimpleTCAPluginConfig, simpleTCAPluginConfigValidator); + } + + + + private static void assertResponseHasErrors(final TestSimpleTCAPluginConfig pluginConfig, + final SimpleTCAPluginConfigValidator validator) { + final GenericValidationResponse validationResponse = validator.validateAppSettings(pluginConfig); + assertTrue(validationResponse.hasErrors()); + LOG.debug("Validation Error Message: {}", validationResponse.getAllErrorMessage()); + } +} |