diff options
author | an4828 <nekrassov@att.com> | 2017-08-21 11:05:08 -0400 |
---|---|---|
committer | Lusheng Ji <lji@research.att.com> | 2017-08-24 00:56:45 +0000 |
commit | e86be39dc5ff812b73398e0720aa3fbf0c48213c (patch) | |
tree | fe3198f180128163490c14c66a1d6074760b220b /dcae-analytics-cdap-plugins/src/main/java/org | |
parent | ff6a13c7ce03ec95fba9d0b4f04b74d0bfeb6a47 (diff) |
Initial TCA commit into DCAEGEN2
Change-Id: I5f7f8af2a00419854cafc34b79277df60d1af095
Issue-ID: DCAEGEN2-53
Signed-off-by: an4828 <nekrassov@att.com>
Diffstat (limited to 'dcae-analytics-cdap-plugins/src/main/java/org')
25 files changed, 2716 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java new file mode 100644 index 0000000..c89f424 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java @@ -0,0 +1,94 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSinkConfigMapper; +import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; + +import java.io.IOException; + +/** + * DMaaP MR Output format used by DMaaP MR Sink Plugin to create a MR Publisher and pass to custom {@link + * DMaaPMRRecordWriter} + * <p> + * @author Rajiv Singla . Creation Date: 1/27/2017. + */ +public class DMaaPMROutputFormat extends OutputFormat<String, NullWritable> { + + @Override + public RecordWriter<String, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + final Configuration configuration = context.getConfiguration(); + final DMaaPMRPublisherConfig publisherConfig = DMaaPSinkConfigMapper.map(configuration); + final DMaaPMRPublisher publisher = DMaaPMRFactory.create().createPublisher(publisherConfig); + return new DMaaPMRRecordWriter(publisher); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + // do nothing + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return new NoOpOutputCommitter(); + } + + /** + * A dummy implementation for {@link OutputCommitter} that does nothing. + */ + protected static class NoOpOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) throws IOException { + // no op + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + // no op + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + // no op + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + // no op + } + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java new file mode 100644 index 0000000..a78d42f --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java @@ -0,0 +1,116 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import co.cask.cdap.api.data.batch.OutputFormatProvider; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * DMaaP MR Output Format Provider used to create Batch Sink Plugin + * <p> + * @author Rajiv Singla . Creation Date: 1/27/2017. + */ +public class DMaaPMROutputFormatProvider implements OutputFormatProvider { + + private final Map<String, String> sinkConfig; + + + public DMaaPMROutputFormatProvider(DMaaPMRSinkPluginConfig sinkPluginConfig) { + + // initialize Sink Config - with DMaaP MR Publisher config values + sinkConfig = new LinkedHashMap<>(); + + // Required fields for sink config + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.HOST_NAME, sinkPluginConfig.getHostName()); + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, sinkPluginConfig.getTopicName()); + + final Integer configPortNumber = sinkPluginConfig.getPortNumber(); + if (configPortNumber != null) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, configPortNumber.toString()); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, + AnalyticsConstants.DEFAULT_PORT_NUMBER.toString()); + } + + final String configProtocol = sinkPluginConfig.getProtocol(); + if (ValidationUtils.isPresent(configProtocol)) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, configProtocol); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, AnalyticsConstants.DEFAULT_PROTOCOL); + } + + + final String configUserName = sinkPluginConfig.getUserName(); + if (ValidationUtils.isPresent(configUserName)) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, configUserName); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, AnalyticsConstants.DEFAULT_USER_NAME); + } + + final String configUserPass = sinkPluginConfig.getUserPassword(); + if (ValidationUtils.isPresent(configUserPass)) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, configUserPass); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, AnalyticsConstants.DEFAULT_USER_PASSWORD); + } + + final String configContentType = sinkPluginConfig.getContentType(); + if (ValidationUtils.isPresent(configContentType)) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, configContentType); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, AnalyticsConstants.DEFAULT_CONTENT_TYPE); + } + + + final Integer configMaxBatchSize = sinkPluginConfig.getMaxBatchSize(); + if (configMaxBatchSize != null) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, configMaxBatchSize.toString()); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, + String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE)); + } + + final Integer configMaxRecoveryQueueSize = sinkPluginConfig.getMaxRecoveryQueueSize(); + if (configMaxRecoveryQueueSize != null) { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, configMaxRecoveryQueueSize.toString()); + } else { + sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, + String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE)); + } + + } + + @Override + public String getOutputFormatClassName() { + return DMaaPMROutputFormat.class.getName(); + } + + @Override + public Map<String, String> getOutputFormatConfiguration() { + return sinkConfig; + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java new file mode 100644 index 0000000..ec0aded --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java @@ -0,0 +1,58 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A simple implementation of {@link RecordWriter} which writes messages to DMaaP MR topic + * <p> + * @author Rajiv Singla . Creation Date: 1/27/2017. + */ +public class DMaaPMRRecordWriter extends RecordWriter<String, NullWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRRecordWriter.class); + + private final DMaaPMRPublisher dMaaPMRPublisher; + + public DMaaPMRRecordWriter(DMaaPMRPublisher dMaaPMRPublisher) { + this.dMaaPMRPublisher = dMaaPMRPublisher; + } + + @Override + public void write(String message, NullWritable value) throws IOException, InterruptedException { + LOG.debug("Writing message to DMaaP MR Topic: {}", message); + dMaaPMRPublisher.publish(Arrays.asList(message)); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + dMaaPMRPublisher.flush(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java new file mode 100644 index 0000000..32ec251 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java @@ -0,0 +1,90 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.batch.Output; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.etl.api.Emitter; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.batch.BatchSink; +import co.cask.cdap.etl.api.batch.BatchSinkContext; +import org.apache.hadoop.io.NullWritable; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSinkPluginConfigValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Rajiv Singla . Creation Date: 1/26/2017. + */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name("DMaaPMRSink") +@Description("A batch sink Plugin that publishes messages to DMaaP MR Topic.") +public class DMaaPMRSink extends BatchSink<StructuredRecord, String, NullWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSink.class); + + private final DMaaPMRSinkPluginConfig pluginConfig; + + public DMaaPMRSink(final DMaaPMRSinkPluginConfig pluginConfig) { + LOG.debug("Creating DMaaP MR Sink Plugin with plugin Config: {}", pluginConfig); + this.pluginConfig = pluginConfig; + } + + @Override + public void configurePipeline(final PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSinkPluginConfigValidator()); + // validates that input schema contains the field provided in Sink Message Column Name property + final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); + CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getMessageColumnName()); + } + + + @Override + public void prepareRun(BatchSinkContext context) throws Exception { + context.addOutput(Output.of(pluginConfig.getReferenceName(), new DMaaPMROutputFormatProvider(pluginConfig))); + } + + @Override + public void transform(StructuredRecord structuredRecord, + Emitter<KeyValue<String, NullWritable>> emitter) throws Exception { + // get incoming message from structured record + final String incomingMessage = structuredRecord.get(pluginConfig.getMessageColumnName()); + + // if incoming messages does not have message column name log warning as it should not happen + if (incomingMessage == null) { + LOG.warn("Column Name: {}, contains no message.Skipped for DMaaP MR Publishing....", + pluginConfig.getMessageColumnName()); + } else { + + // emit the messages as key + emitter.emit(new KeyValue<String, NullWritable>(incomingMessage, null)); + } + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java new file mode 100644 index 0000000..677b764 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java @@ -0,0 +1,37 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.common; + +/** + * Contract interface for all DCAE Analytics Plugin Schemas + * + * @author Rajiv Singla . Creation Date: 1/25/2017. + */ +public interface PluginSchema { + + /** + * Provides column name that will be used in Schema Definition + * + * @return Column name that will be used in Schema Definition + */ + String getSchemaColumnName(); + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java new file mode 100644 index 0000000..b85dc7d --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java @@ -0,0 +1,159 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import com.google.common.base.Objects; +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; + +import javax.annotation.Nullable; + +/** + * Base class for all DMaaP MR Configs + * <p> + * @author Rajiv Singla . Creation Date: 1/17/2017. + */ +public abstract class BaseDMaaPMRPluginConfig extends CDAPBasePluginConfig { + + @Description("DMaaP Message Router HostName") + @Macro + protected String hostName; + + @Description("DMaaP Message Router Host Port number. Defaults to Port 80") + @Nullable + @Macro + protected Integer portNumber; + + @Description("DMaaP Message Router Topic Name") + @Macro + protected String topicName; + + @Description("DMaaP Message Router HTTP Protocol e.g. HTTP or HTTPS. Defaults to HTTPS") + @Nullable + @Macro + protected String protocol; + + @Description("DMaaP Message Router User Name used for AAF Authentication. Defaults to no authentication") + @Nullable + @Macro + protected String userName; + + @Description("DMaaP Message Router User Password used for AAF Authentication. Defaults to no authentication") + @Nullable + @Macro + protected String userPassword; + + @Description("DMaaP Message Router Content Type. Defaults to 'application/json'") + @Nullable + @Macro + protected String contentType; + + + public BaseDMaaPMRPluginConfig(final String referenceName, final String hostName, final String topicName) { + this.referenceName = referenceName; + this.hostName = hostName; + this.topicName = topicName; + } + + /** + * Host Name for DMaaP MR Publisher or Subscriber + * + * @return host name + */ + public String getHostName() { + return hostName; + } + + /** + * Port Number for DMaaP MR Publisher or Subscriber + * + * @return port number + */ + @Nullable + public Integer getPortNumber() { + return portNumber; + } + + /** + * DMaaP MR Topic Name for Subscriber or Publisher + * + * @return topic name + */ + public String getTopicName() { + return topicName; + } + + + /** + * DMaaP MR HTTP or HTTPS protocol + * + * @return http or https protocol + */ + @Nullable + public String getProtocol() { + return protocol; + } + + /** + * User name used for DMaaP MR AAF Authentication + * + * @return User name for DMaaP MR AAF Authentication + */ + @Nullable + public String getUserName() { + return userName; + } + + /** + * User password used for DMaaP MR AAF Authentication + * + * @return User password used for DMaaP MR AAF Authentication + */ + @Nullable + public String getUserPassword() { + return userPassword; + } + + /** + * Content type used for DMaaP MR Topic e.g. 'application/json' + * + * @return content type for DMaaP MR Topic + */ + @Nullable + public String getContentType() { + return contentType; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("referenceName", referenceName) + .add("hostName", hostName) + .add("portNumber", portNumber) + .add("topicName", topicName) + .add("protocol", protocol) + .add("userName", userName) + .add("userPassword", "xxxx") + .add("contentType", contentType) + .toString(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java new file mode 100644 index 0000000..7de7532 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java @@ -0,0 +1,101 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import com.google.common.base.Objects; + +import javax.annotation.Nullable; + +/** + * DMaaP MR Publisher Config + * <p> + * @author Rajiv Singla . Creation Date: 1/17/2017. + */ +public class DMaaPMRSinkPluginConfig extends BaseDMaaPMRPluginConfig { + + private static final long serialVersionUID = 1L; + + @Description("Column name of input schema which contains the message that needs to be written to DMaaP MR Topic") + @Macro + protected String messageColumnName; + + @Description("DMaaP MR Publisher Max Batch Size. Defaults to no Batch") + @Nullable + @Macro + protected Integer maxBatchSize; + + @Description("DMaaP MR Publisher Recovery Queue Size. Default to 1000K messages which can be buffered in memory " + + "in case DMaaP MR Publisher is temporarily unavailable") + @Nullable + @Macro + protected Integer maxRecoveryQueueSize; + + // Required No Arg constructor + public DMaaPMRSinkPluginConfig() { + this(null, null, null, null); + } + + public DMaaPMRSinkPluginConfig(String referenceName, String hostName, String topicName, String messageColumnName) { + super(referenceName, hostName, topicName); + this.messageColumnName = messageColumnName; + } + + /** + * Column name of incoming Schema field that contains the message that needs to published to DMaaP MR Topic + * + * @return Column name of incoming schema which contains message that needs to published to DMaaP MR Topic + */ + public String getMessageColumnName() { + return messageColumnName; + } + + /** + * DMaaP MR Publisher Max Batch Size. + * + * @return DMaaP MR Publisher Max Batch Size + */ + @Nullable + public Integer getMaxBatchSize() { + return maxBatchSize; + } + + /** + * DMaaP MR Publisher Max Recovery Queue Size + * + * @return DMaaP MR Publisher Max Recovery Queue Size + */ + @Nullable + public Integer getMaxRecoveryQueueSize() { + return maxRecoveryQueueSize; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("super", super.toString()) + .add("messageColumnName", messageColumnName) + .add("maxBatchSize", maxBatchSize) + .add("maxRecoveryQueueSize", maxRecoveryQueueSize) + .toString(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java new file mode 100644 index 0000000..a91da35 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java @@ -0,0 +1,134 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import com.google.common.base.Objects; + +import javax.annotation.Nullable; + +/** + * DMaaP MR Subscriber Config + * <p> + * @author Rajiv Singla . Creation Date: 1/17/2017. + */ +public class DMaaPMRSourcePluginConfig extends BaseDMaaPMRPluginConfig { + + private static final long serialVersionUID = 1L; + + @Description("DMaaP MR Polling Interval in MS") + @Macro + protected Integer pollingInterval; + + @Description("DMaaP Message Router Subscriber Consumer ID. Defaults to some randomly created userID") + @Nullable + @Macro + protected String consumerId; + + @Description("DMaaP Message Router Subscriber Consumer Group. Defaults to some randomly created user Group") + @Nullable + @Macro + protected String consumerGroup; + + @Description("DMaaP Message Router Subscriber Timeout in MS. Defaults to no timeout") + @Nullable + @Macro + protected Integer timeoutMS; + + @Description("DMaaP Message Router Subscriber Message Limit. Defaults to no message limit") + @Nullable + @Macro + protected Integer messageLimit; + + // Required No Arg constructor + public DMaaPMRSourcePluginConfig() { + this(null, null, null, 0); + } + + public DMaaPMRSourcePluginConfig(String referenceName, String hostName, String topicName, Integer pollingInterval) { + super(referenceName, hostName, topicName); + this.pollingInterval = pollingInterval; + } + + /** + * DMaaP MR Subscriber Polling interval + * + * @return DMaaP MR Subscriber Polling interval + */ + public Integer getPollingInterval() { + return pollingInterval; + } + + /** + * DMaaP MR Subscriber Consumer ID + * + * @return DMaaP MR Subscriber Consumer ID + */ + @Nullable + public String getConsumerId() { + return consumerId; + } + + /** + * DMaaP MR Subscriber Consumer Group + * + * @return DMaaP MR Subscriber Consumer Group + */ + @Nullable + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * DMaaP MR Subscriber Timeout in MS + * + * @return DMaaP MR Subscriber Timeout in MS + */ + @Nullable + public Integer getTimeoutMS() { + return timeoutMS; + } + + /** + * DMaaP MR Subscriber message limit + * + * @return DMaaP MR Subscriber Message limit + */ + @Nullable + public Integer getMessageLimit() { + return messageLimit; + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("super", super.toString()) + .add("pollingInterval", pollingInterval) + .add("consumerId", consumerId) + .add("consumerGroup", consumerGroup) + .add("timeoutMS", timeoutMS) + .add("messageLimit", messageLimit) + .toString(); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java new file mode 100644 index 0000000..8bb768f --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java @@ -0,0 +1,125 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import co.cask.cdap.api.annotation.Name; +import com.google.common.base.Objects; +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; + +/** + * Configuration for Json Path Filter Plugin + * + * @author Rajiv Singla . Creation Date: 3/2/2017. + */ +public class JsonPathFilterPluginConfig extends CDAPBasePluginConfig { + + private static final long serialVersionUID = 1L; + + @Name("incomingJsonFieldName") + @Description("Input schema field name that contain JSON used for filtering") + @Macro + protected String incomingJsonFieldName; + + + @Name("outputSchemaFieldName") + @Description("Name of the nullable boolean schema field name that will contain result of the filter matching") + @Macro + protected String outputSchemaFieldName; + + + @Name("jsonFilterMappings") + @Macro + @Description("Filters incoming JSON based on given filter mappings - in terms of JSON path and expected values." + + "Right hand side contains JSON path. Left hand side contains semicolon (';') separated expected values " + + "for that JSON Path. If all provided JSON Path mappings and corresponding values matches - " + + "output schema field will be marked as true") + protected String jsonFilterMappings; + + + @Name("schema") + @Description("Output Schema") + protected String schema; + + + public JsonPathFilterPluginConfig(final String referenceName, final String incomingJsonFieldName, + final String outputSchemaFieldName, final String jsonFilterMappings, + final String schema) { + this.referenceName = referenceName; + this.incomingJsonFieldName = incomingJsonFieldName; + this.outputSchemaFieldName = outputSchemaFieldName; + this.jsonFilterMappings = jsonFilterMappings; + this.schema = schema; + } + + /** + * Provides incoming plugin schema field name which contains json used to apply filter + * + * @return name of incoming schema field containing JSON to be filtered + */ + public String getIncomingJsonFieldName() { + return incomingJsonFieldName; + } + + /** + * Provides plugin output schema filed name that will contain result of filter application + * It must be nullable and boolean type + * + * @return name of outgoing schema filed name that will contain filtering result + */ + public String getOutputSchemaFieldName() { + return outputSchemaFieldName; + } + + /** + * Provides JSON filter mappings. LHS contains JSON path value and RHS contains expected + * values separated by semicolon + * + * + * @return String for JSON filter mappings + */ + public String getJsonFilterMappings() { + return jsonFilterMappings; + } + + /** + * Output Schema + * + * @return output schema string + */ + public String getSchema() { + return schema; + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("referenceName", referenceName) + .add("incomingJsonFieldName", incomingJsonFieldName) + .add("outputSchemaFieldName", outputSchemaFieldName) + .add("jsonFilterMappings", jsonFilterMappings) + .add("schema", schema) + .toString(); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java new file mode 100644 index 0000000..d9c2b7a --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java @@ -0,0 +1,154 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import com.google.common.base.Objects; +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; + +import javax.annotation.Nullable; + +/** + * Simple TCA Plugin Configuration + * <p> + * @author Rajiv Singla . Creation Date: 2/13/2017. + */ +public class SimpleTCAPluginConfig extends CDAPBasePluginConfig { + + private static final long serialVersionUID = 1L; + + @Description("Field name containing VES Message") + @Macro + protected String vesMessageFieldName; + + @Description("Policy JSON that need to be applied to VES Message") + @Macro + protected String policyJson; + + @Description("Name of the output field that will contain the alert") + @Macro + protected String alertFieldName; + + @Description("Name of the output field that will contain message type: INAPPLICABLE, COMPLIANT, NON_COMPLIANT") + @Macro + protected String messageTypeFieldName; + + @Description("Specifies the output schema") + protected String schema; + + @Description("Enables") + @Nullable + @Macro + protected Boolean enableAlertCEFFormat; + + + /** + * Creates an instance of TCA Plugin Configs + * + * @param vesMessageFieldName Ves message field name from incoming plugin schema + * @param policyJson TCA Policy Json String + * @param alertFieldName Alert field name that will be added in TCA plugin output schema + * @param messageTypeFieldName Message type field name that will be added in TCA plugin output schema + * @param schema TCA Plugin output schema + * @param enableAlertCEFFormat enables alert message to be formatted in VES format + */ + public SimpleTCAPluginConfig(final String vesMessageFieldName, final String policyJson, + final String alertFieldName, final String messageTypeFieldName, + final String schema, final Boolean enableAlertCEFFormat) { + this.vesMessageFieldName = vesMessageFieldName; + this.policyJson = policyJson; + this.alertFieldName = alertFieldName; + this.messageTypeFieldName = messageTypeFieldName; + this.schema = schema; + this.enableAlertCEFFormat = enableAlertCEFFormat; + } + + /** + * Name of the field containing VES Message + * + * @return VES Message field name + */ + public String getVesMessageFieldName() { + return vesMessageFieldName; + } + + /** + * Policy Json String + * + * @return Policy Json String + */ + public String getPolicyJson() { + return policyJson; + } + + + /** + * Alert Field name in outgoing schema + * + * @return alert field name in outgoing schema + */ + public String getAlertFieldName() { + return alertFieldName; + } + + /** + * Returns output schema string + * + * @return output schema string + */ + public String getSchema() { + return schema; + } + + /** + * Return TCA message type - INAPPLICABLE, COMPLIANT, NON_COMPLIANT + * + * @return tca message type + */ + public String getMessageTypeFieldName() { + return messageTypeFieldName; + } + + + /** + * Returns if Alert output in Common Event format + * + * @return true if alert output is in common event format + */ + @Nullable + public Boolean getEnableAlertCEFFormat() { + return enableAlertCEFFormat; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("referenceName", referenceName) + .add("vesMessageFieldName", vesMessageFieldName) + .add("policyJson", policyJson) + .add("alertFieldName", alertFieldName) + .add("messageTypeFieldName", messageTypeFieldName) + .add("schema", schema) + .add("enableAlertCEFFormat", true) + .toString(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java new file mode 100644 index 0000000..5874d0a --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java @@ -0,0 +1,59 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap; + +import co.cask.cdap.api.data.schema.Schema; +import org.openecomp.dcae.apod.analytics.cdap.plugins.common.PluginSchema; + +/** + * Output Schema for DMaaP MR Source Plugin + * + * @author Rajiv Singla . Creation Date: 1/25/2017. + */ +public enum DMaaPSourceOutputSchema implements PluginSchema { + + TIMESTAMP("ts"), + RESPONSE_CODE("responseCode"), + RESPONSE_MESSAGE("responseMessage"), + FETCHED_MESSAGE("message"); + + private String schemaColumnName; + + DMaaPSourceOutputSchema(String schemaColumnName) { + this.schemaColumnName = schemaColumnName; + } + + @Override + public String getSchemaColumnName() { + return schemaColumnName; + } + + public static Schema getSchema() { + return Schema.recordOf( + "DMaaPMRSourcePluginResponse", + Schema.Field.of(TIMESTAMP.getSchemaColumnName(), Schema.of(Schema.Type.LONG)), + Schema.Field.of(RESPONSE_CODE.getSchemaColumnName(), Schema.of(Schema.Type.INT)), + Schema.Field.of(RESPONSE_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING)), + Schema.Field.of(FETCHED_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING)) + ); + } + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java new file mode 100644 index 0000000..b915ade --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java @@ -0,0 +1,175 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.format.StructuredRecord.Builder; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.StageMetrics; +import co.cask.cdap.etl.api.batch.SparkCompute; +import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.SimpleTCAPluginConfigValidator; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; +import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Rajiv Singla . Creation Date: 2/13/2017. + */ + +@Plugin(type = SparkCompute.PLUGIN_TYPE) +@Name("SimpleTCAPlugin") +@Description("Used to create TCA (Threshold Crossing Alert) based on given Policy") +@SuppressFBWarnings("SE_INNER_CLASS") +public class SimpleTCAPlugin extends SparkCompute<StructuredRecord, StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPlugin.class); + private static final long serialVersionUID = 1L; + + private final SimpleTCAPluginConfig pluginConfig; + + /** + * Create an instance of Simple TCA Plugin with give Simple TCA Plugin Config + * + * @param pluginConfig Simple TCA Plugin Config + */ + public SimpleTCAPlugin(SimpleTCAPluginConfig pluginConfig) { + this.pluginConfig = pluginConfig; + LOG.info("Creating instance of Simple TCA Plugin with plugin config: {}", pluginConfig); + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + ValidationUtils.validateSettings(pluginConfig, new SimpleTCAPluginConfigValidator()); + final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); + CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getVesMessageFieldName()); + CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema()); + } + + @Override + public JavaRDD<StructuredRecord> transform(final SparkExecutionPluginContext context, + final JavaRDD<StructuredRecord> input) throws Exception { + final StageMetrics metrics = context.getMetrics(); + + LOG.debug("Invoking Spark Transform for Simple TCA Plugin"); + return input.map(new Function<StructuredRecord, StructuredRecord>() { + + @Override + public StructuredRecord call(StructuredRecord inputStructuredRecord) throws Exception { + TCACalculatorMessageType calculatorMessageType; + String alertMessage = null; + + // Get input structured record + final String cefMessage = inputStructuredRecord.get(pluginConfig.getVesMessageFieldName()); + + // Get TCA Policy + final TCAPolicy tcaPolicy = CDAPPluginUtils.readValue(pluginConfig.getPolicyJson(), TCAPolicy.class); + + // create initial processor context + final TCACEFProcessorContext initialProcessorContext = + new TCACEFProcessorContext(cefMessage, tcaPolicy); + + final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor(); + final TCACEFProcessorContext jsonProcessorContext = + jsonProcessor.processMessage(initialProcessorContext); + + if (jsonProcessorContext.getCEFEventListener() != null) { + + LOG.debug("Json to CEF parsing successful. Parsed object {}", + jsonProcessorContext.getCEFEventListener()); + + // compute violations + final TCACEFProcessorContext processorContextWithViolations = + TCAUtils.computeThresholdViolations(jsonProcessorContext); + + // if violation are found then create alert message + if (processorContextWithViolations.canProcessingContinue()) { + + alertMessage = TCAUtils.createTCAAlertString(processorContextWithViolations, + pluginConfig.getReferenceName(), pluginConfig.getEnableAlertCEFFormat()); + calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; + + LOG.debug("VES Threshold Violation Detected.An alert message is be generated: {}", + alertMessage); + + final MetricsPerFunctionalRole metricsPerFunctionalRole = + processorContextWithViolations.getMetricsPerFunctionalRole(); + if (metricsPerFunctionalRole != null + && metricsPerFunctionalRole.getThresholds() != null + && metricsPerFunctionalRole.getThresholds().get(0) != null) { + final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0); + LOG.debug("CEF Message: {}, Violated Threshold: {}", cefMessage, violatedThreshold); + } + + metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); + + } else { + LOG.debug("No Threshold Violation Detected. No alert will be generated."); + calculatorMessageType = TCACalculatorMessageType.COMPLIANT; + metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); + } + + } else { + LOG.info("Unable to parse provided json message to CEF format. Invalid message: {}", cefMessage); + calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; + } + + LOG.debug("Calculator message type: {} for message: {}", calculatorMessageType, cefMessage); + + final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema()); + + // create new output record builder and copy any input record values to output record builder + final Builder outputRecordBuilder = + CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord); + + // add alert field + final Builder outputRecordBuilderWithAlertField = + CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder, + outputSchema, pluginConfig.getAlertFieldName(), alertMessage); + + // add message field type + final Builder outRecordBuilderWithMessageTypeField = + CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilderWithAlertField, + outputSchema, pluginConfig.getMessageTypeFieldName(), calculatorMessageType.toString()); + + return outRecordBuilderWithMessageTypeField.build(); + } + }); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java new file mode 100644 index 0000000..aac7fa6 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java @@ -0,0 +1,118 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.metrics.Metrics; +import com.google.common.base.Optional; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records + * <p> + * @author Rajiv Singla . Creation Date: 1/19/2017. + */ +public class DMaaPMRReceiver extends Receiver<StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRReceiver.class); + private static final long serialVersionUID = 1L; + + private final DMaaPMRSourcePluginConfig pluginConfig; + private final Metrics metrics; + + public DMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig, + final Metrics metrics) { + super(storageLevel); + this.pluginConfig = pluginConfig; + this.metrics = metrics; + LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig); + } + + @Override + public void onStart() { + + // create DMaaP MR Subscriber + final DMaaPMRSubscriber subscriber = + DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); + + // Start a new thread with indefinite loop until receiver is stopped + new Thread() { + @Override + public void run() { + while (!isStopped()) { + storeStructuredRecords(subscriber); + try { + final Integer pollingInterval = pluginConfig.getPollingInterval(); + LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); + TimeUnit.MILLISECONDS.sleep(pollingInterval); + } catch (InterruptedException e) { + final String errorMessage = String.format( + "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + } + }.start(); + + } + + @Override + public void onStop() { + LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig); + } + + /** + * Fetches records from DMaaP MR Subscriber and store them as structured records + * + * @param subscriber DMaaP MR Subscriber Instance + */ + public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) { + + LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic"); + + // Fetch messages from DMaaP MR Topic + final Optional<List<String>> subscriberMessagesOptional = + DMaaPMRUtils.getSubscriberMessages(subscriber, metrics); + + // store records + if (subscriberMessagesOptional.isPresent()) { + final List<String> messages = subscriberMessagesOptional.get(); + for (final String message : messages) { + store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(message)); + } + LOG.debug("Stored DMaaP Subscriber messages as Structured Records. Message count {}", messages.size()); + } + } + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java new file mode 100644 index 0000000..a9ecfea --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java @@ -0,0 +1,70 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.streaming.StreamingContext; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema; +import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSourcePluginConfigValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DMaaP MR Source Plugin which polls DMaaP MR topic at frequent intervals + * <p> + * @author Rajiv Singla . Creation Date: 1/18/2017. + */ +@Plugin(type = StreamingSource.PLUGIN_TYPE) +@Name("DMaaPMRSource") +@Description("Fetches DMaaP MR Messages at regular intervals") +public class DMaaPMRSource extends StreamingSource<StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSource.class); + private static final long serialVersionUID = 1L; + + private final DMaaPMRSourcePluginConfig pluginConfig; + + public DMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) { + LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig); + this.pluginConfig = pluginConfig; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSourcePluginConfigValidator()); + pipelineConfigurer.getStageConfigurer().setOutputSchema(DMaaPSourceOutputSchema.getSchema()); + } + + @Override + public JavaDStream<StructuredRecord> getStream(final StreamingContext streamingContext) throws Exception { + return streamingContext.getSparkStreamingContext().receiverStream( + new DMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig, streamingContext.getMetrics())); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java new file mode 100644 index 0000000..a318406 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java @@ -0,0 +1,132 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.data.format.StructuredRecord; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue; +import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString; + +/** + * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records + * <p> + * @author Rajiv Singla . Creation Date: 1/19/2017. + */ +public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class); + private static final long serialVersionUID = 1L; + + private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json"; + private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE = + new TypeReference<List<EventListener>>() { + }; + + private final DMaaPMRSourcePluginConfig pluginConfig; + + public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) { + super(storageLevel); + this.pluginConfig = pluginConfig; + LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig); + } + + @Override + public void onStart() { + + // create DMaaP MR Subscriber + final DMaaPMRSubscriber subscriber = + DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); + storeStructuredRecords(subscriber); + + } + + @Override + public void onStop() { + LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig); + } + + /** + * Fetches records from DMaaP MR Subscriber and store them as structured records + * + * @param subscriber DMaaP MR Subscriber Instance + */ + public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) { + + LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic"); + + try (InputStream resourceAsStream = + Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) { + + if (resourceAsStream == null) { + LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION); + throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException()); + } + + List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE); + + final int totalMessageCount = eventListeners.size(); + LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount); + + int i = 1; + for (EventListener eventListener : eventListeners) { + if (isStopped()) { + return; + } + final String eventListenerString = writeValueAsString(eventListener); + LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount); + store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString)); + i++; + try { + TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval()); + } catch (InterruptedException e) { + LOG.error("Error while sleeping"); + throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e); + } + + } + + LOG.debug("Finished writing mock messages to CDAP Stream"); + + } catch (IOException e) { + LOG.error("Error while parsing json file"); + throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e); + } + } + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java new file mode 100644 index 0000000..e0be12f --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java @@ -0,0 +1,73 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.streaming.StreamingContext; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mock implementation of DMaaP MR Receiver which sends mock ves messages + * <p> + * @author Rajiv Singla . Creation Date: 2/15/2017. + */ +@Plugin(type = StreamingSource.PLUGIN_TYPE) +@Name("MockDMaaPMRSource") +@Description("Fetches DMaaP MR Messages at regular intervals") +public class MockDMaaPMRSource extends StreamingSource<StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRSource.class); + private static final long serialVersionUID = 1L; + + private final DMaaPMRSourcePluginConfig pluginConfig; + + public MockDMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) { + LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig); + this.pluginConfig = pluginConfig; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + final Integer pollingInterval = pluginConfig.getPollingInterval(); + if (pollingInterval == null) { + final String errorMessage = "Polling Interval field must be present"; + throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } else { + LOG.info("Mock Message will be send every ms: {}", pollingInterval); + } + } + + @Override + public JavaDStream<StructuredRecord> getStream(final StreamingContext streamingContext) throws Exception { + return streamingContext.getSparkStreamingContext().receiverStream( + new MockDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig)); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java new file mode 100644 index 0000000..ae0d00a --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java @@ -0,0 +1,134 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.transform.filter; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.Emitter; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.Transform; +import co.cask.cdap.etl.api.TransformContext; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.JsonPathFilterPluginConfigValidator; +import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext; +import org.openecomp.dcae.apod.analytics.common.utils.MessageProcessorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * Json Path filter Plugin filters incoming schema field based of given json path expected values + * <p> + * @author Rajiv Singla . Creation Date: 3/2/2017. + */ + +@Plugin(type = Transform.PLUGIN_TYPE) +@Name("JsonPathFilter") +@Description("Filters incoming schema field based of given json path expected values") +public class JsonPathFilter extends Transform<StructuredRecord, StructuredRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(JsonPathFilter.class); + + private final JsonPathFilterPluginConfig pluginConfig; + private final Map<String, Set<String>> jsonFilterPathMappings; + + public JsonPathFilter(final JsonPathFilterPluginConfig pluginConfig) { + this.pluginConfig = pluginConfig; + jsonFilterPathMappings = Maps.newHashMap(); + LOG.info("Created instance of Json Path Filter Plugin with plugin config: {}", pluginConfig); + } + + + @Override + public void initialize(final TransformContext context) throws Exception { + super.initialize(context); + populateJsonFilterMapping(); + } + + @Override + public void configurePipeline(final PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + ValidationUtils.validateSettings(pluginConfig, new JsonPathFilterPluginConfigValidator()); + final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); + CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getIncomingJsonFieldName()); + populateJsonFilterMapping(); + CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema()); + } + + @Override + public void transform(final StructuredRecord inputStructuredRecord, final Emitter<StructuredRecord> emitter) + throws Exception { + + // get input json message + final String jsonMessage = inputStructuredRecord.get(pluginConfig.getIncomingJsonFieldName()); + + // process Json Filter Mappings + final JsonMessageFilterProcessorContext jsonMessageFilterProcessorContext = + MessageProcessorUtils.processJsonFilterMappings(jsonMessage, jsonFilterPathMappings); + + // create new output record builder and copy any input Structured record values to output record builder + final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema()); + final StructuredRecord.Builder outputRecordBuilder = + CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord); + + // add json filter matched field + final StructuredRecord.Builder outputRecordBuilderWithMatchedField = + CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder, + outputSchema, pluginConfig.getOutputSchemaFieldName(), + jsonMessageFilterProcessorContext.getMatched()); + + // emit structured record with filtering matched field + final StructuredRecord outputStructuredRecord = outputRecordBuilderWithMatchedField.build(); + + LOG.debug("Incoming Json Message: {}.Json Path Filter Output Matched Field: {}", jsonMessage, + outputStructuredRecord.get(pluginConfig.getOutputSchemaFieldName())); + + emitter.emit(outputStructuredRecord); + + } + + /** + * Populates Json Filter Mapping + */ + private void populateJsonFilterMapping() { + final Map<String, String> fieldMappings = + CDAPPluginUtils.extractFieldMappings(pluginConfig.getJsonFilterMappings()); + if (fieldMappings.isEmpty()) { + throw new IllegalArgumentException("No Field Mapping found. Invalid Filter mapping configuration"); + } + final Splitter semiColonSplitter = Splitter.on(";"); + for (Map.Entry<String, String> fieldMappingEntry : fieldMappings.entrySet()) { + jsonFilterPathMappings.put(fieldMappingEntry.getKey(), + Sets.newLinkedHashSet(semiColonSplitter.split(fieldMappingEntry.getValue()))); + } + LOG.info("Input Json Filter Mappings: {}", jsonFilterPathMappings); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java new file mode 100644 index 0000000..af191c5 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java @@ -0,0 +1,295 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; + +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.etl.api.PipelineConfigurer; +import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * @author Rajiv Singla . Creation Date: 1/26/2017. + */ +public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class); + + public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() { + @Override + public Schema.Type apply(@Nonnull Schema schema) { + return schema.getType(); + } + }; + + + + private CDAPPluginUtils() { + // private constructor + } + + /** + * Validates if CDAP Schema contains expected fields + * + * @param schema schema that need to be validated + * @param expectedFields fields that are expected to be in the schema + */ + + public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) { + + LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields)); + + if (schema == null) { + // If input schema is null then no validation possible + LOG.warn("Input Schema is null. No validation possible"); + } else { + // Check if expected fields are indeed present in the schema + for (String expectedField : expectedFields) { + final Schema.Field schemaField = schema.getField(expectedField); + if (schemaField == null) { + final String errorMessage = String.format( + "Unable to find expected field: %s, in schema: %s", expectedField, schema); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + } + LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema, + Arrays.toString(expectedFields)); + } + } + + + /** + * Creates a new Structured Record containing DMaaP MR fetched message + * + * @param message DMaaP MR fetch message + * + * @return Structured record containing DMaaP MR Message + */ + public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) { + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema()); + recordBuilder + .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime()) + .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200) + .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK") + .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message); + return recordBuilder.build(); + } + + + /** + * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord} + * + * @param outputSchema output Schema + * @param inputStructuredRecord input Structured Record + * + * @return output Structured Record builder with pre populated values from input structured record + */ + public static StructuredRecord.Builder createOutputStructuredRecordBuilder( + @Nonnull final Schema outputSchema, + @Nonnull final StructuredRecord inputStructuredRecord) { + + // Get input structured Record Schema + final Schema inputSchema = inputStructuredRecord.getSchema(); + // Create new instance of output Structured Record Builder from output Schema + final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema); + + // iterate over input fields and if output schema has field with same name copy the value to out record builder + for (Schema.Field inputField : inputSchema.getFields()) { + final String inputFieldName = inputField.getName(); + if (outputSchema.getField(inputFieldName) != null) { + outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName)); + } + } + + return outputStructuredRecordBuilder; + } + + + /** + * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name + * + * @param structuredRecordBuilder structured record builder + * @param structuredRecordSchema schema for structured record builder + * @param fieldName field name + * @param fieldValue field value + * + * @return structured record builder with populated field name and value if schema contains field name + */ + public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder( + @Nonnull final StructuredRecord.Builder structuredRecordBuilder, + @Nonnull final Schema structuredRecordSchema, + @Nonnull final String fieldName, + final Object fieldValue) { + + // check if schema contains field Name + if (structuredRecordSchema.getField(fieldName) != null) { + structuredRecordBuilder.set(fieldName, fieldValue); + } else { + LOG.info("Unable to populate value for field Name: {} with field value: {}. " + + "Schema Fields: {} does not contain field name: {}", + fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName); + } + + return structuredRecordBuilder; + } + + + /** + * Validates that given schema String has fieldName of expected type. If field does not exist in given schema + * then validation will pass with warning. If field does exist in given schema then this validation will return + * true if field type is same as expected type else false + * + * @param schemaString CDAP Plugin output or input schema string + * @param fieldName field name + * @param expectedFieldType expected schema field type + * + * @return true if field type matches expected field type else false. If field does not exist in + * give schema validation will pass but will generate a warning message + */ + public static boolean validateSchemaFieldType(@Nonnull final String schemaString, + @Nonnull final String fieldName, + @Nonnull final Schema.Type expectedFieldType) { + + try { + // parse given schema String + final Schema outputSchema = Schema.parseJson(schemaString); + final Schema.Field schemaField = outputSchema.getField(fieldName); + + // if given schema does contain field then validated fieldName type + if (schemaField != null) { + + final List<Schema> schemas = new LinkedList<>(); + + // if it is a union type then grab all union schemas + if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) { + final List<Schema> unionFieldSchemas = + outputSchema.getField(fieldName).getSchema().getUnionSchemas(); + schemas.addAll(unionFieldSchemas); + } else { + // if not union type the just get the field schema + final Schema fieldSchema = outputSchema.getField(fieldName).getSchema(); + schemas.add(fieldSchema); + } + + // get all schema types + final List<Schema.Type> fieldTypes = + Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION); + + // if all schema types does not contain expected field type then return false + if (!fieldTypes.contains(expectedFieldType)) { + LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}", + fieldName, expectedFieldType, outputSchema); + return false; + } + + // field type validation passed + LOG.debug("Successfully validated fieldName: {} is of expected Type: {}", + fieldName, expectedFieldType); + + return true; + + } else { + + // if field does not exist then the validation will pass but will generate warning message + LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}", + fieldName, outputSchema); + return true; + } + + } catch (IOException e) { + final String errorMessage = + String.format("Unable to parse schema: %s for field type validation. " + + "Field Name: %s, Expected Field Type: %s Exception: %s", + schemaString, fieldName, expectedFieldType, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + } + + + /** + * Parses provided schema String as Schema object and set it as output Schema format + * + * @param pipelineConfigurer plugin pipeline configurer + * @param schemaString schema String to be set as output schema + */ + public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) { + try { + final Schema outputSchema = Schema.parseJson(schemaString); + pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema); + } catch (IOException e) { + final String errorMessage = String.format( + "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + } + + + /** + * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument + * exception will be thrown + * + * @param mappingFieldString field Mapping String + * + * @return map containing mapping key values + */ + public static Map<String, String> extractFieldMappings(final String mappingFieldString) { + final Map<String, String> fieldMappings = Maps.newHashMap(); + if (StringUtils.isNotBlank(mappingFieldString)) { + final Splitter commaSplitter = Splitter.on(","); + for (String fieldMapping : commaSplitter.split(mappingFieldString)) { + final String[] keyValueMappings = fieldMapping.split(":"); + if (keyValueMappings.length != 2 || + StringUtils.isBlank(keyValueMappings[0]) || + StringUtils.isBlank(keyValueMappings[1])) { + final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " + + "be present in mappings: " + mappingFieldString; + throw new DCAEAnalyticsRuntimeException( + errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim()); + } + } + return fieldMappings; + } + + + + +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java new file mode 100644 index 0000000..ebe7d49 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java @@ -0,0 +1,112 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; + +import com.google.common.base.Function; +import org.apache.hadoop.conf.Configuration; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + +/** + * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig} + * <p> + * @author Rajiv Singla . Creation Date: 1/26/2017. + */ +public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> { + + /** + * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig} + * + * @param sinkPluginConfig DMaaP Sink Plugin Config + * + * @return DMaaP MR Publisher Config + */ + public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) { + return new DMaaPSinkConfigMapper().apply(sinkPluginConfig); + } + + /** + * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig} + * + * @param configuration Hadoop Configuration containing DMaaP MR Sink field values + * + * @return DMaaP MR Publisher Config + */ + @Nonnull + @Override + public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) { + + // Create a new publisher settings builder + final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME); + final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME); + + if (isEmpty(hostName) || isEmpty(topicName)) { + throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present"); + } + + final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = + new DMaaPMRPublisherConfig.Builder(hostName, topicName); + + // Setup up any optional publisher parameters if they are present + final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER); + if (portNumber != null) { + publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber)); + } + + final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL); + if (isPresent(protocol)) { + publisherConfigBuilder.setProtocol(protocol); + } + + final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME); + if (isPresent(userName)) { + publisherConfigBuilder.setUserName(userName); + } + + final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS); + if (isPresent(userPassword)) { + publisherConfigBuilder.setUserPassword(userPassword); + } + + final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE); + if (isPresent(contentType)) { + publisherConfigBuilder.setContentType(contentType); + } + + final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE); + if (maxBatchSize != null) { + publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize)); + } + + final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE); + if (maxRecoveryQueueSize != null) { + publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize)); + } + + return publisherConfigBuilder.build(); + + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java new file mode 100644 index 0000000..8717632 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java @@ -0,0 +1,118 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; + +import com.google.common.base.Function; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + +/** + * Function that converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} + * <p> + * @author Rajiv Singla . Creation Date: 1/18/2017. + */ +public class DMaaPSourceConfigMapper implements Function<DMaaPMRSourcePluginConfig, DMaaPMRSubscriberConfig> { + + /** + * Static factory method to map {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} + * + * @param pluginConfig DMaaP MR Souce Plugin Config + * + * @return DMaaP MR Subscriber Config + */ + public static DMaaPMRSubscriberConfig map(final DMaaPMRSourcePluginConfig pluginConfig) { + return new DMaaPSourceConfigMapper().apply(pluginConfig); + } + + /** + * Converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} object + * + * @param sourcePluginConfig DMaaP MR Source Plugin Config + * + * @return DMaaP MR Subscriber Config + */ + @Nonnull + @Override + public DMaaPMRSubscriberConfig apply(@Nonnull DMaaPMRSourcePluginConfig sourcePluginConfig) { + + // Create a new subscriber settings builder + final String hostName = sourcePluginConfig.getHostName(); + final String topicName = sourcePluginConfig.getTopicName(); + if (isEmpty(hostName) || isEmpty(topicName)) { + throw new IllegalStateException("DMaaP MR Source Host Name and Topic Name must be present"); + } + final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder( + hostName, topicName); + + // Setup up any optional subscriber parameters if they are present + final Integer subscriberHostPortNumber = sourcePluginConfig.getPortNumber(); + if (subscriberHostPortNumber != null) { + subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber); + } + + final String subscriberProtocol = sourcePluginConfig.getProtocol(); + if (isPresent(subscriberProtocol)) { + subscriberConfigBuilder.setProtocol(subscriberProtocol); + } + + final String subscriberUserName = sourcePluginConfig.getUserName(); + if (isPresent(subscriberUserName)) { + subscriberConfigBuilder.setUserName(subscriberUserName); + } + + final String subscriberUserPassword = sourcePluginConfig.getUserPassword(); + if (isPresent(subscriberUserPassword)) { + subscriberConfigBuilder.setUserPassword(subscriberUserPassword); + } + + final String subscriberContentType = sourcePluginConfig.getContentType(); + if (isPresent(subscriberContentType)) { + subscriberConfigBuilder.setContentType(subscriberContentType); + } + + final String subscriberConsumerId = sourcePluginConfig.getConsumerId(); + if (isPresent(subscriberConsumerId)) { + subscriberConfigBuilder.setConsumerId(subscriberConsumerId); + } + + final String subscriberConsumerGroup = sourcePluginConfig.getConsumerGroup(); + if (isPresent(subscriberConsumerGroup)) { + subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup); + } + + final Integer subscriberTimeoutMS = sourcePluginConfig.getTimeoutMS(); + if (subscriberTimeoutMS != null) { + subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS); + } + final Integer subscriberMessageLimit = sourcePluginConfig.getMessageLimit(); + if (subscriberMessageLimit != null) { + subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit); + } + + // return Subscriber config + return subscriberConfigBuilder.build(); + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java new file mode 100644 index 0000000..e24f940 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java @@ -0,0 +1,72 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; + +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.BaseDMaaPMRPluginConfig; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +/** + * Validates plugin config values which are common in DMaaP MR Configs - {@link BaseDMaaPMRPluginConfig} + * <p> + * @author Rajiv Singla . Creation Date: 1/23/2017. + * + * @param <T> {@link BaseDMaaPMRPluginConfig} Sub classes + */ +public abstract class BaseDMaaPMRPluginConfigValidator<T extends BaseDMaaPMRPluginConfig> implements + CDAPAppSettingsValidator<T, GenericValidationResponse<T>> { + + private static final long serialVersionUID = 1L; + + /** + * Validates the {@link BaseDMaaPMRPluginConfig} parameters + * + * @param baseDMaaPMRPluginConfig DMaaP MR Plugin Config + * + * @return Validation Response containing validation errors if any + */ + @Override + public GenericValidationResponse<T> validateAppSettings(final T baseDMaaPMRPluginConfig) { + + final GenericValidationResponse<T> validationResponse = new GenericValidationResponse<>(); + + if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getHostName())) { + validationResponse.addErrorMessage( + "hostName", + "DMaaPMRPluginConfig - hostname field is undefined: " + baseDMaaPMRPluginConfig); + } + + if (baseDMaaPMRPluginConfig.getPortNumber() == null) { + validationResponse.addErrorMessage( + "port Number", + "DMaaPMRPluginConfig - host port number field is undefined: " + baseDMaaPMRPluginConfig); + } + + if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getTopicName())) { + validationResponse.addErrorMessage( + "topic Name", + "DMaaPMRSourcePluginConfig - topic name field is undefined: " + baseDMaaPMRPluginConfig); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java new file mode 100644 index 0000000..b01f0b4 --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java @@ -0,0 +1,58 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; + +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +/** + * Validates plugin config values in {@link DMaaPMRSinkPluginConfig} + * <p> + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRSinkPluginConfigValidator extends BaseDMaaPMRPluginConfigValidator<DMaaPMRSinkPluginConfig> { + + private static final long serialVersionUID = 1L; + + /** + * Validates plugin config values in {@link DMaaPMRSinkPluginConfig} + * + * @param sinkPluginConfig Sink Plugin Config + * + * @return Validation response containing validation errors if any + */ + @Override + public GenericValidationResponse<DMaaPMRSinkPluginConfig> validateAppSettings( + final DMaaPMRSinkPluginConfig sinkPluginConfig) { + + // validate settings in BaseDMaaPMRPluginConfig + final GenericValidationResponse<DMaaPMRSinkPluginConfig> validationResponse = + super.validateAppSettings(sinkPluginConfig); + + if (ValidationUtils.isEmpty(sinkPluginConfig.getMessageColumnName())) { + validationResponse.addErrorMessage("messageColumn Name", + "DMaaPMRSinkPluginConfig - message column name field is undefined: " + sinkPluginConfig); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java new file mode 100644 index 0000000..56a658c --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java @@ -0,0 +1,58 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; + +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +/** + * Validates plugin config values in {@link DMaaPMRSourcePluginConfig} + * <p> + * @author Rajiv Singla . Creation Date: 1/30/2017. + */ +public class DMaaPMRSourcePluginConfigValidator extends BaseDMaaPMRPluginConfigValidator<DMaaPMRSourcePluginConfig> { + + private static final long serialVersionUID = 1L; + + /** + * Validates plugin config values in {@link DMaaPMRSourcePluginConfig} + * + * @param sourcePluginConfig Source Plugin Config + * + * @return Validation response containing validation errors if any + */ + @Override + public GenericValidationResponse<DMaaPMRSourcePluginConfig> validateAppSettings( + final DMaaPMRSourcePluginConfig sourcePluginConfig) { + + // validate settings in BaseDMaaPMRPluginConfig + final GenericValidationResponse<DMaaPMRSourcePluginConfig> validationResponse = + super.validateAppSettings(sourcePluginConfig); + + if (sourcePluginConfig.getPollingInterval() == null) { + validationResponse.addErrorMessage( + "port Number", + "DMaaPMRSourcePluginConfig - polling interval is undefined: " + sourcePluginConfig); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java new file mode 100644 index 0000000..ff2f18b --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java @@ -0,0 +1,83 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; + +import co.cask.cdap.api.data.schema.Schema; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +/** + * Validator to validate {@link JsonPathFilterPluginConfig} + * <p> + * @author Rajiv Singla . Creation Date: 3/2/2017. + */ +public class JsonPathFilterPluginConfigValidator implements CDAPAppSettingsValidator<JsonPathFilterPluginConfig, + GenericValidationResponse<JsonPathFilterPluginConfig>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<JsonPathFilterPluginConfig> validateAppSettings( + final JsonPathFilterPluginConfig jsonPathFilterPluginConfig) { + + final GenericValidationResponse<JsonPathFilterPluginConfig> validationResponse = + new GenericValidationResponse<>(); + + final String jsonFilterMappings = jsonPathFilterPluginConfig.getJsonFilterMappings(); + if (ValidationUtils.isEmpty(jsonFilterMappings)) { + + validationResponse.addErrorMessage("JsonFilterMappings", "Json Filter Mappings must be present"); + } + + + final String matchedField = jsonPathFilterPluginConfig.getOutputSchemaFieldName(); + final String outputSchemaJson = jsonPathFilterPluginConfig.getSchema(); + + if (ValidationUtils.isEmpty(outputSchemaJson)) { + + validationResponse.addErrorMessage("output schema", "Output schema is not present"); + + } else { + + // validate matched output field type is boolean + if (matchedField != null && + !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.BOOLEAN)) { + validationResponse.addErrorMessage("OutputSchemaFieldName", + String.format( + "OutputSchemaFieldName: %s must be marked as boolean type", matchedField)); + } + + // validate matched output field type is nullable + if (matchedField != null && + !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.NULL)) { + validationResponse.addErrorMessage("OutputSchemaFieldName", + String.format( + "OutputSchemaFieldName: %s must be marked as nullable type", matchedField)); + } + + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java new file mode 100644 index 0000000..e0942ff --- /dev/null +++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java @@ -0,0 +1,91 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; + +import co.cask.cdap.api.data.schema.Schema; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; +import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +/** + * Validator that validate {@link SimpleTCAPluginConfig} + * <p> + * @author Rajiv Singla . Creation Date: 2/21/2017. + */ +public class SimpleTCAPluginConfigValidator implements CDAPAppSettingsValidator<SimpleTCAPluginConfig, + GenericValidationResponse<SimpleTCAPluginConfig>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<SimpleTCAPluginConfig> validateAppSettings( + final SimpleTCAPluginConfig tcaPluginConfig) { + + final GenericValidationResponse<SimpleTCAPluginConfig> validationResponse = new GenericValidationResponse<>(); + + if (ValidationUtils.isEmpty(tcaPluginConfig.getVesMessageFieldName())) { + validationResponse.addErrorMessage("vesMessageFieldName", + "Missing VES Message Field Name from plugin incoming schema"); + } + + if (ValidationUtils.isEmpty(tcaPluginConfig.getPolicyJson())) { + validationResponse.addErrorMessage("policyJson", + "Missing tca Policy Json"); + } + + final String alertFieldValue = tcaPluginConfig.getAlertFieldName(); + final String alertFieldName = "alertFieldName"; + if (ValidationUtils.isEmpty(alertFieldValue)) { + validationResponse.addErrorMessage(alertFieldName, + "Missing alert Field Name that will be placed in plugin outgoing schema"); + } + + if (ValidationUtils.isEmpty(tcaPluginConfig.getMessageTypeFieldName())) { + validationResponse.addErrorMessage("messageTypeField", + "Missing message Type Field Name that will be placed in plugin outgoing schema"); + } + + + final String outputSchemaJson = tcaPluginConfig.getSchema(); + if (ValidationUtils.isEmpty(outputSchemaJson)) { + validationResponse.addErrorMessage("output schema", "Output schema is not present"); + } else { + // validate output schema - alert field name is of type string + if (alertFieldValue != null && + !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.STRING)) { + validationResponse.addErrorMessage(alertFieldName, + String.format( + "Alert Field Name: %s must be String type", alertFieldValue)); + } + // validate output schema - alert field name is nullable + if (alertFieldValue != null && + !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.NULL)) { + validationResponse.addErrorMessage(alertFieldName, + String.format( + "Alert Field Name: %s must be marked as nullable type", alertFieldValue)); + } + } + + return validationResponse; + } +} |