aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-plugins/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-plugins/src/main/java')
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java94
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java116
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java58
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java90
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java37
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java159
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java101
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java134
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java125
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java154
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java59
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java175
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java118
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java70
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java132
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java73
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java134
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java295
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java112
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java118
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java72
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java58
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java58
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java83
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java91
25 files changed, 2716 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java
new file mode 100644
index 0000000..c89f424
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSinkConfigMapper;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+
+import java.io.IOException;
+
+/**
+ * DMaaP MR Output format used by DMaaP MR Sink Plugin to create a MR Publisher and pass to custom {@link
+ * DMaaPMRRecordWriter}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/27/2017.
+ */
+public class DMaaPMROutputFormat extends OutputFormat<String, NullWritable> {
+
+ @Override
+ public RecordWriter<String, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ final Configuration configuration = context.getConfiguration();
+ final DMaaPMRPublisherConfig publisherConfig = DMaaPSinkConfigMapper.map(configuration);
+ final DMaaPMRPublisher publisher = DMaaPMRFactory.create().createPublisher(publisherConfig);
+ return new DMaaPMRRecordWriter(publisher);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ // do nothing
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new NoOpOutputCommitter();
+ }
+
+ /**
+ * A dummy implementation for {@link OutputCommitter} that does nothing.
+ */
+ protected static class NoOpOutputCommitter extends OutputCommitter {
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ // no op
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ // no op
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ // no op
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ // no op
+ }
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java
new file mode 100644
index 0000000..a78d42f
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java
@@ -0,0 +1,116 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import co.cask.cdap.api.data.batch.OutputFormatProvider;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * DMaaP MR Output Format Provider used to create Batch Sink Plugin
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/27/2017.
+ */
+public class DMaaPMROutputFormatProvider implements OutputFormatProvider {
+
+ private final Map<String, String> sinkConfig;
+
+
+ public DMaaPMROutputFormatProvider(DMaaPMRSinkPluginConfig sinkPluginConfig) {
+
+ // initialize Sink Config - with DMaaP MR Publisher config values
+ sinkConfig = new LinkedHashMap<>();
+
+ // Required fields for sink config
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.HOST_NAME, sinkPluginConfig.getHostName());
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, sinkPluginConfig.getTopicName());
+
+ final Integer configPortNumber = sinkPluginConfig.getPortNumber();
+ if (configPortNumber != null) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, configPortNumber.toString());
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
+ AnalyticsConstants.DEFAULT_PORT_NUMBER.toString());
+ }
+
+ final String configProtocol = sinkPluginConfig.getProtocol();
+ if (ValidationUtils.isPresent(configProtocol)) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, configProtocol);
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, AnalyticsConstants.DEFAULT_PROTOCOL);
+ }
+
+
+ final String configUserName = sinkPluginConfig.getUserName();
+ if (ValidationUtils.isPresent(configUserName)) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, configUserName);
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, AnalyticsConstants.DEFAULT_USER_NAME);
+ }
+
+ final String configUserPass = sinkPluginConfig.getUserPassword();
+ if (ValidationUtils.isPresent(configUserPass)) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, configUserPass);
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, AnalyticsConstants.DEFAULT_USER_PASSWORD);
+ }
+
+ final String configContentType = sinkPluginConfig.getContentType();
+ if (ValidationUtils.isPresent(configContentType)) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, configContentType);
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, AnalyticsConstants.DEFAULT_CONTENT_TYPE);
+ }
+
+
+ final Integer configMaxBatchSize = sinkPluginConfig.getMaxBatchSize();
+ if (configMaxBatchSize != null) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, configMaxBatchSize.toString());
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
+ String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE));
+ }
+
+ final Integer configMaxRecoveryQueueSize = sinkPluginConfig.getMaxRecoveryQueueSize();
+ if (configMaxRecoveryQueueSize != null) {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, configMaxRecoveryQueueSize.toString());
+ } else {
+ sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
+ String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE));
+ }
+
+ }
+
+ @Override
+ public String getOutputFormatClassName() {
+ return DMaaPMROutputFormat.class.getName();
+ }
+
+ @Override
+ public Map<String, String> getOutputFormatConfiguration() {
+ return sinkConfig;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java
new file mode 100644
index 0000000..ec0aded
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java
@@ -0,0 +1,58 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A simple implementation of {@link RecordWriter} which writes messages to DMaaP MR topic
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/27/2017.
+ */
+public class DMaaPMRRecordWriter extends RecordWriter<String, NullWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRRecordWriter.class);
+
+ private final DMaaPMRPublisher dMaaPMRPublisher;
+
+ public DMaaPMRRecordWriter(DMaaPMRPublisher dMaaPMRPublisher) {
+ this.dMaaPMRPublisher = dMaaPMRPublisher;
+ }
+
+ @Override
+ public void write(String message, NullWritable value) throws IOException, InterruptedException {
+ LOG.debug("Writing message to DMaaP MR Topic: {}", message);
+ dMaaPMRPublisher.publish(Arrays.asList(message));
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ dMaaPMRPublisher.flush();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java
new file mode 100644
index 0000000..32ec251
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java
@@ -0,0 +1,90 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Name;
+import co.cask.cdap.api.annotation.Plugin;
+import co.cask.cdap.api.data.batch.Output;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.api.dataset.lib.KeyValue;
+import co.cask.cdap.etl.api.Emitter;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.batch.BatchSink;
+import co.cask.cdap.etl.api.batch.BatchSinkContext;
+import org.apache.hadoop.io.NullWritable;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSinkPluginConfigValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/26/2017.
+ */
+@Plugin(type = BatchSink.PLUGIN_TYPE)
+@Name("DMaaPMRSink")
+@Description("A batch sink Plugin that publishes messages to DMaaP MR Topic.")
+public class DMaaPMRSink extends BatchSink<StructuredRecord, String, NullWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSink.class);
+
+ private final DMaaPMRSinkPluginConfig pluginConfig;
+
+ public DMaaPMRSink(final DMaaPMRSinkPluginConfig pluginConfig) {
+ LOG.debug("Creating DMaaP MR Sink Plugin with plugin Config: {}", pluginConfig);
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSinkPluginConfigValidator());
+ // validates that input schema contains the field provided in Sink Message Column Name property
+ final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
+ CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getMessageColumnName());
+ }
+
+
+ @Override
+ public void prepareRun(BatchSinkContext context) throws Exception {
+ context.addOutput(Output.of(pluginConfig.getReferenceName(), new DMaaPMROutputFormatProvider(pluginConfig)));
+ }
+
+ @Override
+ public void transform(StructuredRecord structuredRecord,
+ Emitter<KeyValue<String, NullWritable>> emitter) throws Exception {
+ // get incoming message from structured record
+ final String incomingMessage = structuredRecord.get(pluginConfig.getMessageColumnName());
+
+ // if incoming messages does not have message column name log warning as it should not happen
+ if (incomingMessage == null) {
+ LOG.warn("Column Name: {}, contains no message.Skipped for DMaaP MR Publishing....",
+ pluginConfig.getMessageColumnName());
+ } else {
+
+ // emit the messages as key
+ emitter.emit(new KeyValue<String, NullWritable>(incomingMessage, null));
+ }
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java
new file mode 100644
index 0000000..677b764
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java
@@ -0,0 +1,37 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.common;
+
+/**
+ * Contract interface for all DCAE Analytics Plugin Schemas
+ *
+ * @author Rajiv Singla . Creation Date: 1/25/2017.
+ */
+public interface PluginSchema {
+
+ /**
+ * Provides column name that will be used in Schema Definition
+ *
+ * @return Column name that will be used in Schema Definition
+ */
+ String getSchemaColumnName();
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java
new file mode 100644
index 0000000..b85dc7d
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java
@@ -0,0 +1,159 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import com.google.common.base.Objects;
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base class for all DMaaP MR Configs
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/17/2017.
+ */
+public abstract class BaseDMaaPMRPluginConfig extends CDAPBasePluginConfig {
+
+ @Description("DMaaP Message Router HostName")
+ @Macro
+ protected String hostName;
+
+ @Description("DMaaP Message Router Host Port number. Defaults to Port 80")
+ @Nullable
+ @Macro
+ protected Integer portNumber;
+
+ @Description("DMaaP Message Router Topic Name")
+ @Macro
+ protected String topicName;
+
+ @Description("DMaaP Message Router HTTP Protocol e.g. HTTP or HTTPS. Defaults to HTTPS")
+ @Nullable
+ @Macro
+ protected String protocol;
+
+ @Description("DMaaP Message Router User Name used for AAF Authentication. Defaults to no authentication")
+ @Nullable
+ @Macro
+ protected String userName;
+
+ @Description("DMaaP Message Router User Password used for AAF Authentication. Defaults to no authentication")
+ @Nullable
+ @Macro
+ protected String userPassword;
+
+ @Description("DMaaP Message Router Content Type. Defaults to 'application/json'")
+ @Nullable
+ @Macro
+ protected String contentType;
+
+
+ public BaseDMaaPMRPluginConfig(final String referenceName, final String hostName, final String topicName) {
+ this.referenceName = referenceName;
+ this.hostName = hostName;
+ this.topicName = topicName;
+ }
+
+ /**
+ * Host Name for DMaaP MR Publisher or Subscriber
+ *
+ * @return host name
+ */
+ public String getHostName() {
+ return hostName;
+ }
+
+ /**
+ * Port Number for DMaaP MR Publisher or Subscriber
+ *
+ * @return port number
+ */
+ @Nullable
+ public Integer getPortNumber() {
+ return portNumber;
+ }
+
+ /**
+ * DMaaP MR Topic Name for Subscriber or Publisher
+ *
+ * @return topic name
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+
+ /**
+ * DMaaP MR HTTP or HTTPS protocol
+ *
+ * @return http or https protocol
+ */
+ @Nullable
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * User name used for DMaaP MR AAF Authentication
+ *
+ * @return User name for DMaaP MR AAF Authentication
+ */
+ @Nullable
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * User password used for DMaaP MR AAF Authentication
+ *
+ * @return User password used for DMaaP MR AAF Authentication
+ */
+ @Nullable
+ public String getUserPassword() {
+ return userPassword;
+ }
+
+ /**
+ * Content type used for DMaaP MR Topic e.g. 'application/json'
+ *
+ * @return content type for DMaaP MR Topic
+ */
+ @Nullable
+ public String getContentType() {
+ return contentType;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("referenceName", referenceName)
+ .add("hostName", hostName)
+ .add("portNumber", portNumber)
+ .add("topicName", topicName)
+ .add("protocol", protocol)
+ .add("userName", userName)
+ .add("userPassword", "xxxx")
+ .add("contentType", contentType)
+ .toString();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java
new file mode 100644
index 0000000..7de7532
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java
@@ -0,0 +1,101 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import com.google.common.base.Objects;
+
+import javax.annotation.Nullable;
+
+/**
+ * DMaaP MR Publisher Config
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/17/2017.
+ */
+public class DMaaPMRSinkPluginConfig extends BaseDMaaPMRPluginConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ @Description("Column name of input schema which contains the message that needs to be written to DMaaP MR Topic")
+ @Macro
+ protected String messageColumnName;
+
+ @Description("DMaaP MR Publisher Max Batch Size. Defaults to no Batch")
+ @Nullable
+ @Macro
+ protected Integer maxBatchSize;
+
+ @Description("DMaaP MR Publisher Recovery Queue Size. Default to 1000K messages which can be buffered in memory " +
+ "in case DMaaP MR Publisher is temporarily unavailable")
+ @Nullable
+ @Macro
+ protected Integer maxRecoveryQueueSize;
+
+ // Required No Arg constructor
+ public DMaaPMRSinkPluginConfig() {
+ this(null, null, null, null);
+ }
+
+ public DMaaPMRSinkPluginConfig(String referenceName, String hostName, String topicName, String messageColumnName) {
+ super(referenceName, hostName, topicName);
+ this.messageColumnName = messageColumnName;
+ }
+
+ /**
+ * Column name of incoming Schema field that contains the message that needs to published to DMaaP MR Topic
+ *
+ * @return Column name of incoming schema which contains message that needs to published to DMaaP MR Topic
+ */
+ public String getMessageColumnName() {
+ return messageColumnName;
+ }
+
+ /**
+ * DMaaP MR Publisher Max Batch Size.
+ *
+ * @return DMaaP MR Publisher Max Batch Size
+ */
+ @Nullable
+ public Integer getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * DMaaP MR Publisher Max Recovery Queue Size
+ *
+ * @return DMaaP MR Publisher Max Recovery Queue Size
+ */
+ @Nullable
+ public Integer getMaxRecoveryQueueSize() {
+ return maxRecoveryQueueSize;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("super", super.toString())
+ .add("messageColumnName", messageColumnName)
+ .add("maxBatchSize", maxBatchSize)
+ .add("maxRecoveryQueueSize", maxRecoveryQueueSize)
+ .toString();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java
new file mode 100644
index 0000000..a91da35
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java
@@ -0,0 +1,134 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import com.google.common.base.Objects;
+
+import javax.annotation.Nullable;
+
+/**
+ * DMaaP MR Subscriber Config
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/17/2017.
+ */
+public class DMaaPMRSourcePluginConfig extends BaseDMaaPMRPluginConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ @Description("DMaaP MR Polling Interval in MS")
+ @Macro
+ protected Integer pollingInterval;
+
+ @Description("DMaaP Message Router Subscriber Consumer ID. Defaults to some randomly created userID")
+ @Nullable
+ @Macro
+ protected String consumerId;
+
+ @Description("DMaaP Message Router Subscriber Consumer Group. Defaults to some randomly created user Group")
+ @Nullable
+ @Macro
+ protected String consumerGroup;
+
+ @Description("DMaaP Message Router Subscriber Timeout in MS. Defaults to no timeout")
+ @Nullable
+ @Macro
+ protected Integer timeoutMS;
+
+ @Description("DMaaP Message Router Subscriber Message Limit. Defaults to no message limit")
+ @Nullable
+ @Macro
+ protected Integer messageLimit;
+
+ // Required No Arg constructor
+ public DMaaPMRSourcePluginConfig() {
+ this(null, null, null, 0);
+ }
+
+ public DMaaPMRSourcePluginConfig(String referenceName, String hostName, String topicName, Integer pollingInterval) {
+ super(referenceName, hostName, topicName);
+ this.pollingInterval = pollingInterval;
+ }
+
+ /**
+ * DMaaP MR Subscriber Polling interval
+ *
+ * @return DMaaP MR Subscriber Polling interval
+ */
+ public Integer getPollingInterval() {
+ return pollingInterval;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer ID
+ *
+ * @return DMaaP MR Subscriber Consumer ID
+ */
+ @Nullable
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ /**
+ * DMaaP MR Subscriber Consumer Group
+ *
+ * @return DMaaP MR Subscriber Consumer Group
+ */
+ @Nullable
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * DMaaP MR Subscriber Timeout in MS
+ *
+ * @return DMaaP MR Subscriber Timeout in MS
+ */
+ @Nullable
+ public Integer getTimeoutMS() {
+ return timeoutMS;
+ }
+
+ /**
+ * DMaaP MR Subscriber message limit
+ *
+ * @return DMaaP MR Subscriber Message limit
+ */
+ @Nullable
+ public Integer getMessageLimit() {
+ return messageLimit;
+ }
+
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("super", super.toString())
+ .add("pollingInterval", pollingInterval)
+ .add("consumerId", consumerId)
+ .add("consumerGroup", consumerGroup)
+ .add("timeoutMS", timeoutMS)
+ .add("messageLimit", messageLimit)
+ .toString();
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java
new file mode 100644
index 0000000..8bb768f
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java
@@ -0,0 +1,125 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import co.cask.cdap.api.annotation.Name;
+import com.google.common.base.Objects;
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig;
+
+/**
+ * Configuration for Json Path Filter Plugin
+ *
+ * @author Rajiv Singla . Creation Date: 3/2/2017.
+ */
+public class JsonPathFilterPluginConfig extends CDAPBasePluginConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ @Name("incomingJsonFieldName")
+ @Description("Input schema field name that contain JSON used for filtering")
+ @Macro
+ protected String incomingJsonFieldName;
+
+
+ @Name("outputSchemaFieldName")
+ @Description("Name of the nullable boolean schema field name that will contain result of the filter matching")
+ @Macro
+ protected String outputSchemaFieldName;
+
+
+ @Name("jsonFilterMappings")
+ @Macro
+ @Description("Filters incoming JSON based on given filter mappings - in terms of JSON path and expected values." +
+ "Right hand side contains JSON path. Left hand side contains semicolon (';') separated expected values " +
+ "for that JSON Path. If all provided JSON Path mappings and corresponding values matches - " +
+ "output schema field will be marked as true")
+ protected String jsonFilterMappings;
+
+
+ @Name("schema")
+ @Description("Output Schema")
+ protected String schema;
+
+
+ public JsonPathFilterPluginConfig(final String referenceName, final String incomingJsonFieldName,
+ final String outputSchemaFieldName, final String jsonFilterMappings,
+ final String schema) {
+ this.referenceName = referenceName;
+ this.incomingJsonFieldName = incomingJsonFieldName;
+ this.outputSchemaFieldName = outputSchemaFieldName;
+ this.jsonFilterMappings = jsonFilterMappings;
+ this.schema = schema;
+ }
+
+ /**
+ * Provides incoming plugin schema field name which contains json used to apply filter
+ *
+ * @return name of incoming schema field containing JSON to be filtered
+ */
+ public String getIncomingJsonFieldName() {
+ return incomingJsonFieldName;
+ }
+
+ /**
+ * Provides plugin output schema filed name that will contain result of filter application
+ * It must be nullable and boolean type
+ *
+ * @return name of outgoing schema filed name that will contain filtering result
+ */
+ public String getOutputSchemaFieldName() {
+ return outputSchemaFieldName;
+ }
+
+ /**
+ * Provides JSON filter mappings. LHS contains JSON path value and RHS contains expected
+ * values separated by semicolon
+ *
+ *
+ * @return String for JSON filter mappings
+ */
+ public String getJsonFilterMappings() {
+ return jsonFilterMappings;
+ }
+
+ /**
+ * Output Schema
+ *
+ * @return output schema string
+ */
+ public String getSchema() {
+ return schema;
+ }
+
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("referenceName", referenceName)
+ .add("incomingJsonFieldName", incomingJsonFieldName)
+ .add("outputSchemaFieldName", outputSchemaFieldName)
+ .add("jsonFilterMappings", jsonFilterMappings)
+ .add("schema", schema)
+ .toString();
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java
new file mode 100644
index 0000000..d9c2b7a
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java
@@ -0,0 +1,154 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import com.google.common.base.Objects;
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Simple TCA Plugin Configuration
+ * <p>
+ * @author Rajiv Singla . Creation Date: 2/13/2017.
+ */
+public class SimpleTCAPluginConfig extends CDAPBasePluginConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ @Description("Field name containing VES Message")
+ @Macro
+ protected String vesMessageFieldName;
+
+ @Description("Policy JSON that need to be applied to VES Message")
+ @Macro
+ protected String policyJson;
+
+ @Description("Name of the output field that will contain the alert")
+ @Macro
+ protected String alertFieldName;
+
+ @Description("Name of the output field that will contain message type: INAPPLICABLE, COMPLIANT, NON_COMPLIANT")
+ @Macro
+ protected String messageTypeFieldName;
+
+ @Description("Specifies the output schema")
+ protected String schema;
+
+ @Description("Enables")
+ @Nullable
+ @Macro
+ protected Boolean enableAlertCEFFormat;
+
+
+ /**
+ * Creates an instance of TCA Plugin Configs
+ *
+ * @param vesMessageFieldName Ves message field name from incoming plugin schema
+ * @param policyJson TCA Policy Json String
+ * @param alertFieldName Alert field name that will be added in TCA plugin output schema
+ * @param messageTypeFieldName Message type field name that will be added in TCA plugin output schema
+ * @param schema TCA Plugin output schema
+ * @param enableAlertCEFFormat enables alert message to be formatted in VES format
+ */
+ public SimpleTCAPluginConfig(final String vesMessageFieldName, final String policyJson,
+ final String alertFieldName, final String messageTypeFieldName,
+ final String schema, final Boolean enableAlertCEFFormat) {
+ this.vesMessageFieldName = vesMessageFieldName;
+ this.policyJson = policyJson;
+ this.alertFieldName = alertFieldName;
+ this.messageTypeFieldName = messageTypeFieldName;
+ this.schema = schema;
+ this.enableAlertCEFFormat = enableAlertCEFFormat;
+ }
+
+ /**
+ * Name of the field containing VES Message
+ *
+ * @return VES Message field name
+ */
+ public String getVesMessageFieldName() {
+ return vesMessageFieldName;
+ }
+
+ /**
+ * Policy Json String
+ *
+ * @return Policy Json String
+ */
+ public String getPolicyJson() {
+ return policyJson;
+ }
+
+
+ /**
+ * Alert Field name in outgoing schema
+ *
+ * @return alert field name in outgoing schema
+ */
+ public String getAlertFieldName() {
+ return alertFieldName;
+ }
+
+ /**
+ * Returns output schema string
+ *
+ * @return output schema string
+ */
+ public String getSchema() {
+ return schema;
+ }
+
+ /**
+ * Return TCA message type - INAPPLICABLE, COMPLIANT, NON_COMPLIANT
+ *
+ * @return tca message type
+ */
+ public String getMessageTypeFieldName() {
+ return messageTypeFieldName;
+ }
+
+
+ /**
+ * Returns if Alert output in Common Event format
+ *
+ * @return true if alert output is in common event format
+ */
+ @Nullable
+ public Boolean getEnableAlertCEFFormat() {
+ return enableAlertCEFFormat;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("referenceName", referenceName)
+ .add("vesMessageFieldName", vesMessageFieldName)
+ .add("policyJson", policyJson)
+ .add("alertFieldName", alertFieldName)
+ .add("messageTypeFieldName", messageTypeFieldName)
+ .add("schema", schema)
+ .add("enableAlertCEFFormat", true)
+ .toString();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java
new file mode 100644
index 0000000..5874d0a
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java
@@ -0,0 +1,59 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap;
+
+import co.cask.cdap.api.data.schema.Schema;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.common.PluginSchema;
+
+/**
+ * Output Schema for DMaaP MR Source Plugin
+ *
+ * @author Rajiv Singla . Creation Date: 1/25/2017.
+ */
+public enum DMaaPSourceOutputSchema implements PluginSchema {
+
+ TIMESTAMP("ts"),
+ RESPONSE_CODE("responseCode"),
+ RESPONSE_MESSAGE("responseMessage"),
+ FETCHED_MESSAGE("message");
+
+ private String schemaColumnName;
+
+ DMaaPSourceOutputSchema(String schemaColumnName) {
+ this.schemaColumnName = schemaColumnName;
+ }
+
+ @Override
+ public String getSchemaColumnName() {
+ return schemaColumnName;
+ }
+
+ public static Schema getSchema() {
+ return Schema.recordOf(
+ "DMaaPMRSourcePluginResponse",
+ Schema.Field.of(TIMESTAMP.getSchemaColumnName(), Schema.of(Schema.Type.LONG)),
+ Schema.Field.of(RESPONSE_CODE.getSchemaColumnName(), Schema.of(Schema.Type.INT)),
+ Schema.Field.of(RESPONSE_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING)),
+ Schema.Field.of(FETCHED_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING))
+ );
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java
new file mode 100644
index 0000000..b915ade
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java
@@ -0,0 +1,175 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Name;
+import co.cask.cdap.api.annotation.Plugin;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.format.StructuredRecord.Builder;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.StageMetrics;
+import co.cask.cdap.etl.api.batch.SparkCompute;
+import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.SimpleTCAPluginConfigValidator;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Rajiv Singla . Creation Date: 2/13/2017.
+ */
+
+@Plugin(type = SparkCompute.PLUGIN_TYPE)
+@Name("SimpleTCAPlugin")
+@Description("Used to create TCA (Threshold Crossing Alert) based on given Policy")
+@SuppressFBWarnings("SE_INNER_CLASS")
+public class SimpleTCAPlugin extends SparkCompute<StructuredRecord, StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPlugin.class);
+ private static final long serialVersionUID = 1L;
+
+ private final SimpleTCAPluginConfig pluginConfig;
+
+ /**
+ * Create an instance of Simple TCA Plugin with give Simple TCA Plugin Config
+ *
+ * @param pluginConfig Simple TCA Plugin Config
+ */
+ public SimpleTCAPlugin(SimpleTCAPluginConfig pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ LOG.info("Creating instance of Simple TCA Plugin with plugin config: {}", pluginConfig);
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ ValidationUtils.validateSettings(pluginConfig, new SimpleTCAPluginConfigValidator());
+ final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
+ CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getVesMessageFieldName());
+ CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema());
+ }
+
+ @Override
+ public JavaRDD<StructuredRecord> transform(final SparkExecutionPluginContext context,
+ final JavaRDD<StructuredRecord> input) throws Exception {
+ final StageMetrics metrics = context.getMetrics();
+
+ LOG.debug("Invoking Spark Transform for Simple TCA Plugin");
+ return input.map(new Function<StructuredRecord, StructuredRecord>() {
+
+ @Override
+ public StructuredRecord call(StructuredRecord inputStructuredRecord) throws Exception {
+ TCACalculatorMessageType calculatorMessageType;
+ String alertMessage = null;
+
+ // Get input structured record
+ final String cefMessage = inputStructuredRecord.get(pluginConfig.getVesMessageFieldName());
+
+ // Get TCA Policy
+ final TCAPolicy tcaPolicy = CDAPPluginUtils.readValue(pluginConfig.getPolicyJson(), TCAPolicy.class);
+
+ // create initial processor context
+ final TCACEFProcessorContext initialProcessorContext =
+ new TCACEFProcessorContext(cefMessage, tcaPolicy);
+
+ final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
+ final TCACEFProcessorContext jsonProcessorContext =
+ jsonProcessor.processMessage(initialProcessorContext);
+
+ if (jsonProcessorContext.getCEFEventListener() != null) {
+
+ LOG.debug("Json to CEF parsing successful. Parsed object {}",
+ jsonProcessorContext.getCEFEventListener());
+
+ // compute violations
+ final TCACEFProcessorContext processorContextWithViolations =
+ TCAUtils.computeThresholdViolations(jsonProcessorContext);
+
+ // if violation are found then create alert message
+ if (processorContextWithViolations.canProcessingContinue()) {
+
+ alertMessage = TCAUtils.createTCAAlertString(processorContextWithViolations,
+ pluginConfig.getReferenceName(), pluginConfig.getEnableAlertCEFFormat());
+ calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT;
+
+ LOG.debug("VES Threshold Violation Detected.An alert message is be generated: {}",
+ alertMessage);
+
+ final MetricsPerFunctionalRole metricsPerFunctionalRole =
+ processorContextWithViolations.getMetricsPerFunctionalRole();
+ if (metricsPerFunctionalRole != null
+ && metricsPerFunctionalRole.getThresholds() != null
+ && metricsPerFunctionalRole.getThresholds().get(0) != null) {
+ final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0);
+ LOG.debug("CEF Message: {}, Violated Threshold: {}", cefMessage, violatedThreshold);
+ }
+
+ metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
+
+ } else {
+ LOG.debug("No Threshold Violation Detected. No alert will be generated.");
+ calculatorMessageType = TCACalculatorMessageType.COMPLIANT;
+ metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1);
+ }
+
+ } else {
+ LOG.info("Unable to parse provided json message to CEF format. Invalid message: {}", cefMessage);
+ calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
+ }
+
+ LOG.debug("Calculator message type: {} for message: {}", calculatorMessageType, cefMessage);
+
+ final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema());
+
+ // create new output record builder and copy any input record values to output record builder
+ final Builder outputRecordBuilder =
+ CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord);
+
+ // add alert field
+ final Builder outputRecordBuilderWithAlertField =
+ CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder,
+ outputSchema, pluginConfig.getAlertFieldName(), alertMessage);
+
+ // add message field type
+ final Builder outRecordBuilderWithMessageTypeField =
+ CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilderWithAlertField,
+ outputSchema, pluginConfig.getMessageTypeFieldName(), calculatorMessageType.toString());
+
+ return outRecordBuilderWithMessageTypeField.build();
+ }
+ });
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java
new file mode 100644
index 0000000..aac7fa6
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java
@@ -0,0 +1,118 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.metrics.Metrics;
+import com.google.common.base.Optional;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/19/2017.
+ */
+public class DMaaPMRReceiver extends Receiver<StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRReceiver.class);
+ private static final long serialVersionUID = 1L;
+
+ private final DMaaPMRSourcePluginConfig pluginConfig;
+ private final Metrics metrics;
+
+ public DMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig,
+ final Metrics metrics) {
+ super(storageLevel);
+ this.pluginConfig = pluginConfig;
+ this.metrics = metrics;
+ LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);
+ }
+
+ @Override
+ public void onStart() {
+
+ // create DMaaP MR Subscriber
+ final DMaaPMRSubscriber subscriber =
+ DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
+
+ // Start a new thread with indefinite loop until receiver is stopped
+ new Thread() {
+ @Override
+ public void run() {
+ while (!isStopped()) {
+ storeStructuredRecords(subscriber);
+ try {
+ final Integer pollingInterval = pluginConfig.getPollingInterval();
+ LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval);
+ TimeUnit.MILLISECONDS.sleep(pollingInterval);
+ } catch (InterruptedException e) {
+ final String errorMessage = String.format(
+ "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+ }
+ }.start();
+
+ }
+
+ @Override
+ public void onStop() {
+ LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);
+ }
+
+ /**
+ * Fetches records from DMaaP MR Subscriber and store them as structured records
+ *
+ * @param subscriber DMaaP MR Subscriber Instance
+ */
+ public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {
+
+ LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");
+
+ // Fetch messages from DMaaP MR Topic
+ final Optional<List<String>> subscriberMessagesOptional =
+ DMaaPMRUtils.getSubscriberMessages(subscriber, metrics);
+
+ // store records
+ if (subscriberMessagesOptional.isPresent()) {
+ final List<String> messages = subscriberMessagesOptional.get();
+ for (final String message : messages) {
+ store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(message));
+ }
+ LOG.debug("Stored DMaaP Subscriber messages as Structured Records. Message count {}", messages.size());
+ }
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java
new file mode 100644
index 0000000..a9ecfea
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java
@@ -0,0 +1,70 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Name;
+import co.cask.cdap.api.annotation.Plugin;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.streaming.StreamingContext;
+import co.cask.cdap.etl.api.streaming.StreamingSource;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSourcePluginConfigValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DMaaP MR Source Plugin which polls DMaaP MR topic at frequent intervals
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/18/2017.
+ */
+@Plugin(type = StreamingSource.PLUGIN_TYPE)
+@Name("DMaaPMRSource")
+@Description("Fetches DMaaP MR Messages at regular intervals")
+public class DMaaPMRSource extends StreamingSource<StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSource.class);
+ private static final long serialVersionUID = 1L;
+
+ private final DMaaPMRSourcePluginConfig pluginConfig;
+
+ public DMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) {
+ LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig);
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSourcePluginConfigValidator());
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(DMaaPSourceOutputSchema.getSchema());
+ }
+
+ @Override
+ public JavaDStream<StructuredRecord> getStream(final StreamingContext streamingContext) throws Exception {
+ return streamingContext.getSparkStreamingContext().receiverStream(
+ new DMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig, streamingContext.getMetrics()));
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java
new file mode 100644
index 0000000..a318406
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java
@@ -0,0 +1,132 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue;
+import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString;
+
+/**
+ * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/19/2017.
+ */
+public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class);
+ private static final long serialVersionUID = 1L;
+
+ private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
+ private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
+ new TypeReference<List<EventListener>>() {
+ };
+
+ private final DMaaPMRSourcePluginConfig pluginConfig;
+
+ public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) {
+ super(storageLevel);
+ this.pluginConfig = pluginConfig;
+ LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);
+ }
+
+ @Override
+ public void onStart() {
+
+ // create DMaaP MR Subscriber
+ final DMaaPMRSubscriber subscriber =
+ DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
+ storeStructuredRecords(subscriber);
+
+ }
+
+ @Override
+ public void onStop() {
+ LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);
+ }
+
+ /**
+ * Fetches records from DMaaP MR Subscriber and store them as structured records
+ *
+ * @param subscriber DMaaP MR Subscriber Instance
+ */
+ public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {
+
+ LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");
+
+ try (InputStream resourceAsStream =
+ Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) {
+
+ if (resourceAsStream == null) {
+ LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
+ throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
+ }
+
+ List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
+
+ final int totalMessageCount = eventListeners.size();
+ LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
+
+ int i = 1;
+ for (EventListener eventListener : eventListeners) {
+ if (isStopped()) {
+ return;
+ }
+ final String eventListenerString = writeValueAsString(eventListener);
+ LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
+ store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString));
+ i++;
+ try {
+ TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval());
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping");
+ throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
+ }
+
+ }
+
+ LOG.debug("Finished writing mock messages to CDAP Stream");
+
+ } catch (IOException e) {
+ LOG.error("Error while parsing json file");
+ throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
+ }
+ }
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java
new file mode 100644
index 0000000..e0be12f
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java
@@ -0,0 +1,73 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Name;
+import co.cask.cdap.api.annotation.Plugin;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.streaming.StreamingContext;
+import co.cask.cdap.etl.api.streaming.StreamingSource;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mock implementation of DMaaP MR Receiver which sends mock ves messages
+ * <p>
+ * @author Rajiv Singla . Creation Date: 2/15/2017.
+ */
+@Plugin(type = StreamingSource.PLUGIN_TYPE)
+@Name("MockDMaaPMRSource")
+@Description("Fetches DMaaP MR Messages at regular intervals")
+public class MockDMaaPMRSource extends StreamingSource<StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRSource.class);
+ private static final long serialVersionUID = 1L;
+
+ private final DMaaPMRSourcePluginConfig pluginConfig;
+
+ public MockDMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) {
+ LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig);
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ final Integer pollingInterval = pluginConfig.getPollingInterval();
+ if (pollingInterval == null) {
+ final String errorMessage = "Polling Interval field must be present";
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ } else {
+ LOG.info("Mock Message will be send every ms: {}", pollingInterval);
+ }
+ }
+
+ @Override
+ public JavaDStream<StructuredRecord> getStream(final StreamingContext streamingContext) throws Exception {
+ return streamingContext.getSparkStreamingContext().receiverStream(
+ new MockDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig));
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java
new file mode 100644
index 0000000..ae0d00a
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java
@@ -0,0 +1,134 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.transform.filter;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Name;
+import co.cask.cdap.api.annotation.Plugin;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.Emitter;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import co.cask.cdap.etl.api.Transform;
+import co.cask.cdap.etl.api.TransformContext;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.JsonPathFilterPluginConfigValidator;
+import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext;
+import org.openecomp.dcae.apod.analytics.common.utils.MessageProcessorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Json Path filter Plugin filters incoming schema field based of given json path expected values
+ * <p>
+ * @author Rajiv Singla . Creation Date: 3/2/2017.
+ */
+
+@Plugin(type = Transform.PLUGIN_TYPE)
+@Name("JsonPathFilter")
+@Description("Filters incoming schema field based of given json path expected values")
+public class JsonPathFilter extends Transform<StructuredRecord, StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonPathFilter.class);
+
+ private final JsonPathFilterPluginConfig pluginConfig;
+ private final Map<String, Set<String>> jsonFilterPathMappings;
+
+ public JsonPathFilter(final JsonPathFilterPluginConfig pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ jsonFilterPathMappings = Maps.newHashMap();
+ LOG.info("Created instance of Json Path Filter Plugin with plugin config: {}", pluginConfig);
+ }
+
+
+ @Override
+ public void initialize(final TransformContext context) throws Exception {
+ super.initialize(context);
+ populateJsonFilterMapping();
+ }
+
+ @Override
+ public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ ValidationUtils.validateSettings(pluginConfig, new JsonPathFilterPluginConfigValidator());
+ final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
+ CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getIncomingJsonFieldName());
+ populateJsonFilterMapping();
+ CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema());
+ }
+
+ @Override
+ public void transform(final StructuredRecord inputStructuredRecord, final Emitter<StructuredRecord> emitter)
+ throws Exception {
+
+ // get input json message
+ final String jsonMessage = inputStructuredRecord.get(pluginConfig.getIncomingJsonFieldName());
+
+ // process Json Filter Mappings
+ final JsonMessageFilterProcessorContext jsonMessageFilterProcessorContext =
+ MessageProcessorUtils.processJsonFilterMappings(jsonMessage, jsonFilterPathMappings);
+
+ // create new output record builder and copy any input Structured record values to output record builder
+ final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema());
+ final StructuredRecord.Builder outputRecordBuilder =
+ CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord);
+
+ // add json filter matched field
+ final StructuredRecord.Builder outputRecordBuilderWithMatchedField =
+ CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder,
+ outputSchema, pluginConfig.getOutputSchemaFieldName(),
+ jsonMessageFilterProcessorContext.getMatched());
+
+ // emit structured record with filtering matched field
+ final StructuredRecord outputStructuredRecord = outputRecordBuilderWithMatchedField.build();
+
+ LOG.debug("Incoming Json Message: {}.Json Path Filter Output Matched Field: {}", jsonMessage,
+ outputStructuredRecord.get(pluginConfig.getOutputSchemaFieldName()));
+
+ emitter.emit(outputStructuredRecord);
+
+ }
+
+ /**
+ * Populates Json Filter Mapping
+ */
+ private void populateJsonFilterMapping() {
+ final Map<String, String> fieldMappings =
+ CDAPPluginUtils.extractFieldMappings(pluginConfig.getJsonFilterMappings());
+ if (fieldMappings.isEmpty()) {
+ throw new IllegalArgumentException("No Field Mapping found. Invalid Filter mapping configuration");
+ }
+ final Splitter semiColonSplitter = Splitter.on(";");
+ for (Map.Entry<String, String> fieldMappingEntry : fieldMappings.entrySet()) {
+ jsonFilterPathMappings.put(fieldMappingEntry.getKey(),
+ Sets.newLinkedHashSet(semiColonSplitter.split(fieldMappingEntry.getValue())));
+ }
+ LOG.info("Input Json Filter Mappings: {}", jsonFilterPathMappings);
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java
new file mode 100644
index 0000000..af191c5
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java
@@ -0,0 +1,295 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/26/2017.
+ */
+public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
+
+ public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
+ @Override
+ public Schema.Type apply(@Nonnull Schema schema) {
+ return schema.getType();
+ }
+ };
+
+
+
+ private CDAPPluginUtils() {
+ // private constructor
+ }
+
+ /**
+ * Validates if CDAP Schema contains expected fields
+ *
+ * @param schema schema that need to be validated
+ * @param expectedFields fields that are expected to be in the schema
+ */
+
+ public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
+
+ LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
+
+ if (schema == null) {
+ // If input schema is null then no validation possible
+ LOG.warn("Input Schema is null. No validation possible");
+ } else {
+ // Check if expected fields are indeed present in the schema
+ for (String expectedField : expectedFields) {
+ final Schema.Field schemaField = schema.getField(expectedField);
+ if (schemaField == null) {
+ final String errorMessage = String.format(
+ "Unable to find expected field: %s, in schema: %s", expectedField, schema);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ }
+ LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
+ Arrays.toString(expectedFields));
+ }
+ }
+
+
+ /**
+ * Creates a new Structured Record containing DMaaP MR fetched message
+ *
+ * @param message DMaaP MR fetch message
+ *
+ * @return Structured record containing DMaaP MR Message
+ */
+ public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
+ recordBuilder
+ .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
+ .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
+ .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
+ .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
+ return recordBuilder.build();
+ }
+
+
+ /**
+ * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord}
+ *
+ * @param outputSchema output Schema
+ * @param inputStructuredRecord input Structured Record
+ *
+ * @return output Structured Record builder with pre populated values from input structured record
+ */
+ public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
+ @Nonnull final Schema outputSchema,
+ @Nonnull final StructuredRecord inputStructuredRecord) {
+
+ // Get input structured Record Schema
+ final Schema inputSchema = inputStructuredRecord.getSchema();
+ // Create new instance of output Structured Record Builder from output Schema
+ final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
+
+ // iterate over input fields and if output schema has field with same name copy the value to out record builder
+ for (Schema.Field inputField : inputSchema.getFields()) {
+ final String inputFieldName = inputField.getName();
+ if (outputSchema.getField(inputFieldName) != null) {
+ outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
+ }
+ }
+
+ return outputStructuredRecordBuilder;
+ }
+
+
+ /**
+ * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name
+ *
+ * @param structuredRecordBuilder structured record builder
+ * @param structuredRecordSchema schema for structured record builder
+ * @param fieldName field name
+ * @param fieldValue field value
+ *
+ * @return structured record builder with populated field name and value if schema contains field name
+ */
+ public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
+ @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
+ @Nonnull final Schema structuredRecordSchema,
+ @Nonnull final String fieldName,
+ final Object fieldValue) {
+
+ // check if schema contains field Name
+ if (structuredRecordSchema.getField(fieldName) != null) {
+ structuredRecordBuilder.set(fieldName, fieldValue);
+ } else {
+ LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
+ "Schema Fields: {} does not contain field name: {}",
+ fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
+ }
+
+ return structuredRecordBuilder;
+ }
+
+
+ /**
+ * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
+ * then validation will pass with warning. If field does exist in given schema then this validation will return
+ * true if field type is same as expected type else false
+ *
+ * @param schemaString CDAP Plugin output or input schema string
+ * @param fieldName field name
+ * @param expectedFieldType expected schema field type
+ *
+ * @return true if field type matches expected field type else false. If field does not exist in
+ * give schema validation will pass but will generate a warning message
+ */
+ public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
+ @Nonnull final String fieldName,
+ @Nonnull final Schema.Type expectedFieldType) {
+
+ try {
+ // parse given schema String
+ final Schema outputSchema = Schema.parseJson(schemaString);
+ final Schema.Field schemaField = outputSchema.getField(fieldName);
+
+ // if given schema does contain field then validated fieldName type
+ if (schemaField != null) {
+
+ final List<Schema> schemas = new LinkedList<>();
+
+ // if it is a union type then grab all union schemas
+ if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
+ final List<Schema> unionFieldSchemas =
+ outputSchema.getField(fieldName).getSchema().getUnionSchemas();
+ schemas.addAll(unionFieldSchemas);
+ } else {
+ // if not union type the just get the field schema
+ final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
+ schemas.add(fieldSchema);
+ }
+
+ // get all schema types
+ final List<Schema.Type> fieldTypes =
+ Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
+
+ // if all schema types does not contain expected field type then return false
+ if (!fieldTypes.contains(expectedFieldType)) {
+ LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
+ fieldName, expectedFieldType, outputSchema);
+ return false;
+ }
+
+ // field type validation passed
+ LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
+ fieldName, expectedFieldType);
+
+ return true;
+
+ } else {
+
+ // if field does not exist then the validation will pass but will generate warning message
+ LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
+ fieldName, outputSchema);
+ return true;
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ String.format("Unable to parse schema: %s for field type validation. " +
+ "Field Name: %s, Expected Field Type: %s Exception: %s",
+ schemaString, fieldName, expectedFieldType, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ /**
+ * Parses provided schema String as Schema object and set it as output Schema format
+ *
+ * @param pipelineConfigurer plugin pipeline configurer
+ * @param schemaString schema String to be set as output schema
+ */
+ public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
+ try {
+ final Schema outputSchema = Schema.parseJson(schemaString);
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
+ } catch (IOException e) {
+ final String errorMessage = String.format(
+ "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ }
+
+
+ /**
+ * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
+ * exception will be thrown
+ *
+ * @param mappingFieldString field Mapping String
+ *
+ * @return map containing mapping key values
+ */
+ public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
+ final Map<String, String> fieldMappings = Maps.newHashMap();
+ if (StringUtils.isNotBlank(mappingFieldString)) {
+ final Splitter commaSplitter = Splitter.on(",");
+ for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
+ final String[] keyValueMappings = fieldMapping.split(":");
+ if (keyValueMappings.length != 2 ||
+ StringUtils.isBlank(keyValueMappings[0]) ||
+ StringUtils.isBlank(keyValueMappings[1])) {
+ final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
+ "be present in mappings: " + mappingFieldString;
+ throw new DCAEAnalyticsRuntimeException(
+ errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
+ }
+ }
+ return fieldMappings;
+ }
+
+
+
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java
new file mode 100644
index 0000000..ebe7d49
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java
@@ -0,0 +1,112 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import com.google.common.base.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+/**
+ * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/26/2017.
+ */
+public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> {
+
+ /**
+ * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ *
+ * @param sinkPluginConfig DMaaP Sink Plugin Config
+ *
+ * @return DMaaP MR Publisher Config
+ */
+ public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) {
+ return new DMaaPSinkConfigMapper().apply(sinkPluginConfig);
+ }
+
+ /**
+ * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ *
+ * @param configuration Hadoop Configuration containing DMaaP MR Sink field values
+ *
+ * @return DMaaP MR Publisher Config
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) {
+
+ // Create a new publisher settings builder
+ final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);
+ final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);
+
+ if (isEmpty(hostName) || isEmpty(topicName)) {
+ throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present");
+ }
+
+ final DMaaPMRPublisherConfig.Builder publisherConfigBuilder =
+ new DMaaPMRPublisherConfig.Builder(hostName, topicName);
+
+ // Setup up any optional publisher parameters if they are present
+ final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);
+ if (portNumber != null) {
+ publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber));
+ }
+
+ final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);
+ if (isPresent(protocol)) {
+ publisherConfigBuilder.setProtocol(protocol);
+ }
+
+ final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME);
+ if (isPresent(userName)) {
+ publisherConfigBuilder.setUserName(userName);
+ }
+
+ final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS);
+ if (isPresent(userPassword)) {
+ publisherConfigBuilder.setUserPassword(userPassword);
+ }
+
+ final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE);
+ if (isPresent(contentType)) {
+ publisherConfigBuilder.setContentType(contentType);
+ }
+
+ final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE);
+ if (maxBatchSize != null) {
+ publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize));
+ }
+
+ final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE);
+ if (maxRecoveryQueueSize != null) {
+ publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize));
+ }
+
+ return publisherConfigBuilder.build();
+
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java
new file mode 100644
index 0000000..8717632
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java
@@ -0,0 +1,118 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import com.google.common.base.Function;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+/**
+ * Function that converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/18/2017.
+ */
+public class DMaaPSourceConfigMapper implements Function<DMaaPMRSourcePluginConfig, DMaaPMRSubscriberConfig> {
+
+ /**
+ * Static factory method to map {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig}
+ *
+ * @param pluginConfig DMaaP MR Souce Plugin Config
+ *
+ * @return DMaaP MR Subscriber Config
+ */
+ public static DMaaPMRSubscriberConfig map(final DMaaPMRSourcePluginConfig pluginConfig) {
+ return new DMaaPSourceConfigMapper().apply(pluginConfig);
+ }
+
+ /**
+ * Converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} object
+ *
+ * @param sourcePluginConfig DMaaP MR Source Plugin Config
+ *
+ * @return DMaaP MR Subscriber Config
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRSubscriberConfig apply(@Nonnull DMaaPMRSourcePluginConfig sourcePluginConfig) {
+
+ // Create a new subscriber settings builder
+ final String hostName = sourcePluginConfig.getHostName();
+ final String topicName = sourcePluginConfig.getTopicName();
+ if (isEmpty(hostName) || isEmpty(topicName)) {
+ throw new IllegalStateException("DMaaP MR Source Host Name and Topic Name must be present");
+ }
+ final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder(
+ hostName, topicName);
+
+ // Setup up any optional subscriber parameters if they are present
+ final Integer subscriberHostPortNumber = sourcePluginConfig.getPortNumber();
+ if (subscriberHostPortNumber != null) {
+ subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber);
+ }
+
+ final String subscriberProtocol = sourcePluginConfig.getProtocol();
+ if (isPresent(subscriberProtocol)) {
+ subscriberConfigBuilder.setProtocol(subscriberProtocol);
+ }
+
+ final String subscriberUserName = sourcePluginConfig.getUserName();
+ if (isPresent(subscriberUserName)) {
+ subscriberConfigBuilder.setUserName(subscriberUserName);
+ }
+
+ final String subscriberUserPassword = sourcePluginConfig.getUserPassword();
+ if (isPresent(subscriberUserPassword)) {
+ subscriberConfigBuilder.setUserPassword(subscriberUserPassword);
+ }
+
+ final String subscriberContentType = sourcePluginConfig.getContentType();
+ if (isPresent(subscriberContentType)) {
+ subscriberConfigBuilder.setContentType(subscriberContentType);
+ }
+
+ final String subscriberConsumerId = sourcePluginConfig.getConsumerId();
+ if (isPresent(subscriberConsumerId)) {
+ subscriberConfigBuilder.setConsumerId(subscriberConsumerId);
+ }
+
+ final String subscriberConsumerGroup = sourcePluginConfig.getConsumerGroup();
+ if (isPresent(subscriberConsumerGroup)) {
+ subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup);
+ }
+
+ final Integer subscriberTimeoutMS = sourcePluginConfig.getTimeoutMS();
+ if (subscriberTimeoutMS != null) {
+ subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS);
+ }
+ final Integer subscriberMessageLimit = sourcePluginConfig.getMessageLimit();
+ if (subscriberMessageLimit != null) {
+ subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit);
+ }
+
+ // return Subscriber config
+ return subscriberConfigBuilder.build();
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java
new file mode 100644
index 0000000..e24f940
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java
@@ -0,0 +1,72 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.BaseDMaaPMRPluginConfig;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+/**
+ * Validates plugin config values which are common in DMaaP MR Configs - {@link BaseDMaaPMRPluginConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ *
+ * @param <T> {@link BaseDMaaPMRPluginConfig} Sub classes
+ */
+public abstract class BaseDMaaPMRPluginConfigValidator<T extends BaseDMaaPMRPluginConfig> implements
+ CDAPAppSettingsValidator<T, GenericValidationResponse<T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Validates the {@link BaseDMaaPMRPluginConfig} parameters
+ *
+ * @param baseDMaaPMRPluginConfig DMaaP MR Plugin Config
+ *
+ * @return Validation Response containing validation errors if any
+ */
+ @Override
+ public GenericValidationResponse<T> validateAppSettings(final T baseDMaaPMRPluginConfig) {
+
+ final GenericValidationResponse<T> validationResponse = new GenericValidationResponse<>();
+
+ if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getHostName())) {
+ validationResponse.addErrorMessage(
+ "hostName",
+ "DMaaPMRPluginConfig - hostname field is undefined: " + baseDMaaPMRPluginConfig);
+ }
+
+ if (baseDMaaPMRPluginConfig.getPortNumber() == null) {
+ validationResponse.addErrorMessage(
+ "port Number",
+ "DMaaPMRPluginConfig - host port number field is undefined: " + baseDMaaPMRPluginConfig);
+ }
+
+ if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getTopicName())) {
+ validationResponse.addErrorMessage(
+ "topic Name",
+ "DMaaPMRSourcePluginConfig - topic name field is undefined: " + baseDMaaPMRPluginConfig);
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java
new file mode 100644
index 0000000..b01f0b4
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java
@@ -0,0 +1,58 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+/**
+ * Validates plugin config values in {@link DMaaPMRSinkPluginConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRSinkPluginConfigValidator extends BaseDMaaPMRPluginConfigValidator<DMaaPMRSinkPluginConfig> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Validates plugin config values in {@link DMaaPMRSinkPluginConfig}
+ *
+ * @param sinkPluginConfig Sink Plugin Config
+ *
+ * @return Validation response containing validation errors if any
+ */
+ @Override
+ public GenericValidationResponse<DMaaPMRSinkPluginConfig> validateAppSettings(
+ final DMaaPMRSinkPluginConfig sinkPluginConfig) {
+
+ // validate settings in BaseDMaaPMRPluginConfig
+ final GenericValidationResponse<DMaaPMRSinkPluginConfig> validationResponse =
+ super.validateAppSettings(sinkPluginConfig);
+
+ if (ValidationUtils.isEmpty(sinkPluginConfig.getMessageColumnName())) {
+ validationResponse.addErrorMessage("messageColumn Name",
+ "DMaaPMRSinkPluginConfig - message column name field is undefined: " + sinkPluginConfig);
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java
new file mode 100644
index 0000000..56a658c
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java
@@ -0,0 +1,58 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+/**
+ * Validates plugin config values in {@link DMaaPMRSourcePluginConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/30/2017.
+ */
+public class DMaaPMRSourcePluginConfigValidator extends BaseDMaaPMRPluginConfigValidator<DMaaPMRSourcePluginConfig> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Validates plugin config values in {@link DMaaPMRSourcePluginConfig}
+ *
+ * @param sourcePluginConfig Source Plugin Config
+ *
+ * @return Validation response containing validation errors if any
+ */
+ @Override
+ public GenericValidationResponse<DMaaPMRSourcePluginConfig> validateAppSettings(
+ final DMaaPMRSourcePluginConfig sourcePluginConfig) {
+
+ // validate settings in BaseDMaaPMRPluginConfig
+ final GenericValidationResponse<DMaaPMRSourcePluginConfig> validationResponse =
+ super.validateAppSettings(sourcePluginConfig);
+
+ if (sourcePluginConfig.getPollingInterval() == null) {
+ validationResponse.addErrorMessage(
+ "port Number",
+ "DMaaPMRSourcePluginConfig - polling interval is undefined: " + sourcePluginConfig);
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java
new file mode 100644
index 0000000..ff2f18b
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java
@@ -0,0 +1,83 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.validator;
+
+import co.cask.cdap.api.data.schema.Schema;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+/**
+ * Validator to validate {@link JsonPathFilterPluginConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 3/2/2017.
+ */
+public class JsonPathFilterPluginConfigValidator implements CDAPAppSettingsValidator<JsonPathFilterPluginConfig,
+ GenericValidationResponse<JsonPathFilterPluginConfig>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public GenericValidationResponse<JsonPathFilterPluginConfig> validateAppSettings(
+ final JsonPathFilterPluginConfig jsonPathFilterPluginConfig) {
+
+ final GenericValidationResponse<JsonPathFilterPluginConfig> validationResponse =
+ new GenericValidationResponse<>();
+
+ final String jsonFilterMappings = jsonPathFilterPluginConfig.getJsonFilterMappings();
+ if (ValidationUtils.isEmpty(jsonFilterMappings)) {
+
+ validationResponse.addErrorMessage("JsonFilterMappings", "Json Filter Mappings must be present");
+ }
+
+
+ final String matchedField = jsonPathFilterPluginConfig.getOutputSchemaFieldName();
+ final String outputSchemaJson = jsonPathFilterPluginConfig.getSchema();
+
+ if (ValidationUtils.isEmpty(outputSchemaJson)) {
+
+ validationResponse.addErrorMessage("output schema", "Output schema is not present");
+
+ } else {
+
+ // validate matched output field type is boolean
+ if (matchedField != null &&
+ !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.BOOLEAN)) {
+ validationResponse.addErrorMessage("OutputSchemaFieldName",
+ String.format(
+ "OutputSchemaFieldName: %s must be marked as boolean type", matchedField));
+ }
+
+ // validate matched output field type is nullable
+ if (matchedField != null &&
+ !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.NULL)) {
+ validationResponse.addErrorMessage("OutputSchemaFieldName",
+ String.format(
+ "OutputSchemaFieldName: %s must be marked as nullable type", matchedField));
+ }
+
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java
new file mode 100644
index 0000000..e0942ff
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java
@@ -0,0 +1,91 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.validator;
+
+import co.cask.cdap.api.data.schema.Schema;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+/**
+ * Validator that validate {@link SimpleTCAPluginConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 2/21/2017.
+ */
+public class SimpleTCAPluginConfigValidator implements CDAPAppSettingsValidator<SimpleTCAPluginConfig,
+ GenericValidationResponse<SimpleTCAPluginConfig>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public GenericValidationResponse<SimpleTCAPluginConfig> validateAppSettings(
+ final SimpleTCAPluginConfig tcaPluginConfig) {
+
+ final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse = new GenericValidationResponse<>();
+
+ if (ValidationUtils.isEmpty(tcaPluginConfig.getVesMessageFieldName())) {
+ validationResponse.addErrorMessage("vesMessageFieldName",
+ "Missing VES Message Field Name from plugin incoming schema");
+ }
+
+ if (ValidationUtils.isEmpty(tcaPluginConfig.getPolicyJson())) {
+ validationResponse.addErrorMessage("policyJson",
+ "Missing tca Policy Json");
+ }
+
+ final String alertFieldValue = tcaPluginConfig.getAlertFieldName();
+ final String alertFieldName = "alertFieldName";
+ if (ValidationUtils.isEmpty(alertFieldValue)) {
+ validationResponse.addErrorMessage(alertFieldName,
+ "Missing alert Field Name that will be placed in plugin outgoing schema");
+ }
+
+ if (ValidationUtils.isEmpty(tcaPluginConfig.getMessageTypeFieldName())) {
+ validationResponse.addErrorMessage("messageTypeField",
+ "Missing message Type Field Name that will be placed in plugin outgoing schema");
+ }
+
+
+ final String outputSchemaJson = tcaPluginConfig.getSchema();
+ if (ValidationUtils.isEmpty(outputSchemaJson)) {
+ validationResponse.addErrorMessage("output schema", "Output schema is not present");
+ } else {
+ // validate output schema - alert field name is of type string
+ if (alertFieldValue != null &&
+ !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.STRING)) {
+ validationResponse.addErrorMessage(alertFieldName,
+ String.format(
+ "Alert Field Name: %s must be String type", alertFieldValue));
+ }
+ // validate output schema - alert field name is nullable
+ if (alertFieldValue != null &&
+ !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.NULL)) {
+ validationResponse.addErrorMessage(alertFieldName,
+ String.format(
+ "Alert Field Name: %s must be marked as nullable type", alertFieldValue));
+ }
+ }
+
+ return validationResponse;
+ }
+}