From c489a2eb22484e798c39a978bc8b61821b92322f Mon Sep 17 00:00:00 2001 From: an4828 Date: Mon, 22 Jan 2018 17:17:34 -0500 Subject: TCA: Replace any openecomp reference by onap Change-Id: I7c6d812ab5c1d7b30c63653d1974b0b1abc099be Signed-off-by: an4828 Issue-ID: DCAEGEN2-224 Signed-off-by: an4828 --- .../batch/sink/dmaap/DMaaPMROutputFormat.java | 94 ------- .../sink/dmaap/DMaaPMROutputFormatProvider.java | 116 -------- .../batch/sink/dmaap/DMaaPMRRecordWriter.java | 58 ---- .../cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java | 90 ------- .../cdap/plugins/common/PluginSchema.java | 37 --- .../config/dmaap/BaseDMaaPMRPluginConfig.java | 159 ----------- .../config/dmaap/DMaaPMRSinkPluginConfig.java | 101 ------- .../config/dmaap/DMaaPMRSourcePluginConfig.java | 134 ---------- .../config/filter/JsonPathFilterPluginConfig.java | 125 --------- .../domain/config/tca/SimpleTCAPluginConfig.java | 154 ----------- .../schema/dmaap/DMaaPSourceOutputSchema.java | 59 ----- .../plugins/sparkcompute/tca/SimpleTCAPlugin.java | 175 ------------ .../plugins/streaming/dmaap/DMaaPMRReceiver.java | 118 --------- .../plugins/streaming/dmaap/DMaaPMRSource.java | 70 ----- .../streaming/dmaap/MockDMaaPMRReceiver.java | 132 --------- .../plugins/streaming/dmaap/MockDMaaPMRSource.java | 73 ----- .../plugins/transform/filter/JsonPathFilter.java | 134 ---------- .../cdap/plugins/utils/CDAPPluginUtils.java | 295 --------------------- .../cdap/plugins/utils/DMaaPSinkConfigMapper.java | 112 -------- .../plugins/utils/DMaaPSourceConfigMapper.java | 118 --------- .../BaseDMaaPMRPluginConfigValidator.java | 72 ----- .../DMaaPMRSinkPluginConfigValidator.java | 58 ---- .../DMaaPMRSourcePluginConfigValidator.java | 58 ---- .../JsonPathFilterPluginConfigValidator.java | 83 ------ .../validator/SimpleTCAPluginConfigValidator.java | 91 ------- 25 files changed, 2716 deletions(-) delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java delete mode 100644 dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java (limited to 'dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod') diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java deleted file mode 100644 index fdb7975..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormat.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSinkConfigMapper; -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; - -import java.io.IOException; - -/** - * DMaaP MR Output format used by DMaaP MR Sink Plugin to create a MR Publisher and pass to custom {@link - * DMaaPMRRecordWriter} - *

- * @author Rajiv Singla . Creation Date: 1/27/2017. - */ -public class DMaaPMROutputFormat extends OutputFormat { - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, - InterruptedException { - final Configuration configuration = context.getConfiguration(); - final DMaaPMRPublisherConfig publisherConfig = DMaaPSinkConfigMapper.map(configuration); - final DMaaPMRPublisher publisher = DMaaPMRFactory.create().createPublisher(publisherConfig); - return new DMaaPMRRecordWriter(publisher); - } - - @Override - public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { - // do nothing - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return new NoOpOutputCommitter(); - } - - /** - * A dummy implementation for {@link OutputCommitter} that does nothing. - */ - protected static class NoOpOutputCommitter extends OutputCommitter { - - @Override - public void setupJob(JobContext jobContext) throws IOException { - // no op - } - - @Override - public void setupTask(TaskAttemptContext taskContext) throws IOException { - // no op - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { - return false; - } - - @Override - public void commitTask(TaskAttemptContext taskContext) throws IOException { - // no op - } - - @Override - public void abortTask(TaskAttemptContext taskContext) throws IOException { - // no op - } - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java deleted file mode 100644 index bec04b3..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMROutputFormatProvider.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; - -import co.cask.cdap.api.data.batch.OutputFormatProvider; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; - -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * DMaaP MR Output Format Provider used to create Batch Sink Plugin - *

- * @author Rajiv Singla . Creation Date: 1/27/2017. - */ -public class DMaaPMROutputFormatProvider implements OutputFormatProvider { - - private final Map sinkConfig; - - - public DMaaPMROutputFormatProvider(DMaaPMRSinkPluginConfig sinkPluginConfig) { - - // initialize Sink Config - with DMaaP MR Publisher config values - sinkConfig = new LinkedHashMap<>(); - - // Required fields for sink config - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.HOST_NAME, sinkPluginConfig.getHostName()); - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, sinkPluginConfig.getTopicName()); - - final Integer configPortNumber = sinkPluginConfig.getPortNumber(); - if (configPortNumber != null) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, configPortNumber.toString()); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, - AnalyticsConstants.DEFAULT_PORT_NUMBER.toString()); - } - - final String configProtocol = sinkPluginConfig.getProtocol(); - if (ValidationUtils.isPresent(configProtocol)) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, configProtocol); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, AnalyticsConstants.DEFAULT_PROTOCOL); - } - - - final String configUserName = sinkPluginConfig.getUserName(); - if (ValidationUtils.isPresent(configUserName)) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, configUserName); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, AnalyticsConstants.DEFAULT_USER_NAME); - } - - final String configUserPass = sinkPluginConfig.getUserPassword(); - if (ValidationUtils.isPresent(configUserPass)) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, configUserPass); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, AnalyticsConstants.DEFAULT_USER_PASSWORD); - } - - final String configContentType = sinkPluginConfig.getContentType(); - if (ValidationUtils.isPresent(configContentType)) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, configContentType); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, AnalyticsConstants.DEFAULT_CONTENT_TYPE); - } - - - final Integer configMaxBatchSize = sinkPluginConfig.getMaxBatchSize(); - if (configMaxBatchSize != null) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, configMaxBatchSize.toString()); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, - String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE)); - } - - final Integer configMaxRecoveryQueueSize = sinkPluginConfig.getMaxRecoveryQueueSize(); - if (configMaxRecoveryQueueSize != null) { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, configMaxRecoveryQueueSize.toString()); - } else { - sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, - String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE)); - } - - } - - @Override - public String getOutputFormatClassName() { - return DMaaPMROutputFormat.class.getName(); - } - - @Override - public Map getOutputFormatConfiguration() { - return sinkConfig; - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java deleted file mode 100644 index f9c99e2..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRRecordWriter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; - -/** - * A simple implementation of {@link RecordWriter} which writes messages to DMaaP MR topic - *

- * @author Rajiv Singla . Creation Date: 1/27/2017. - */ -public class DMaaPMRRecordWriter extends RecordWriter { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRRecordWriter.class); - - private final DMaaPMRPublisher dMaaPMRPublisher; - - public DMaaPMRRecordWriter(DMaaPMRPublisher dMaaPMRPublisher) { - this.dMaaPMRPublisher = dMaaPMRPublisher; - } - - @Override - public void write(String message, NullWritable value) throws IOException, InterruptedException { - LOG.debug("Writing message to DMaaP MR Topic: {}", message); - dMaaPMRPublisher.publish(Arrays.asList(message)); - } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - dMaaPMRPublisher.flush(); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java deleted file mode 100644 index b92ecba..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/batch/sink/dmaap/DMaaPMRSink.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.api.data.batch.Output; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.data.schema.Schema; -import co.cask.cdap.api.dataset.lib.KeyValue; -import co.cask.cdap.etl.api.Emitter; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.batch.BatchSink; -import co.cask.cdap.etl.api.batch.BatchSinkContext; -import org.apache.hadoop.io.NullWritable; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSinkPluginConfigValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Rajiv Singla . Creation Date: 1/26/2017. - */ -@Plugin(type = BatchSink.PLUGIN_TYPE) -@Name("DMaaPMRSink") -@Description("A batch sink Plugin that publishes messages to DMaaP MR Topic.") -public class DMaaPMRSink extends BatchSink { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSink.class); - - private final DMaaPMRSinkPluginConfig pluginConfig; - - public DMaaPMRSink(final DMaaPMRSinkPluginConfig pluginConfig) { - LOG.debug("Creating DMaaP MR Sink Plugin with plugin Config: {}", pluginConfig); - this.pluginConfig = pluginConfig; - } - - @Override - public void configurePipeline(final PipelineConfigurer pipelineConfigurer) { - super.configurePipeline(pipelineConfigurer); - ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSinkPluginConfigValidator()); - // validates that input schema contains the field provided in Sink Message Column Name property - final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); - CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getMessageColumnName()); - } - - - @Override - public void prepareRun(BatchSinkContext context) throws Exception { - context.addOutput(Output.of(pluginConfig.getReferenceName(), new DMaaPMROutputFormatProvider(pluginConfig))); - } - - @Override - public void transform(StructuredRecord structuredRecord, - Emitter> emitter) throws Exception { - // get incoming message from structured record - final String incomingMessage = structuredRecord.get(pluginConfig.getMessageColumnName()); - - // if incoming messages does not have message column name log warning as it should not happen - if (incomingMessage == null) { - LOG.warn("Column Name: {}, contains no message.Skipped for DMaaP MR Publishing....", - pluginConfig.getMessageColumnName()); - } else { - - // emit the messages as key - emitter.emit(new KeyValue(incomingMessage, null)); - } - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java deleted file mode 100644 index e9afde9..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/common/PluginSchema.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.common; - -/** - * Contract interface for all DCAE Analytics Plugin Schemas - * - * @author Rajiv Singla . Creation Date: 1/25/2017. - */ -public interface PluginSchema { - - /** - * Provides column name that will be used in Schema Definition - * - * @return Column name that will be used in Schema Definition - */ - String getSchemaColumnName(); - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java deleted file mode 100644 index b63375c..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/BaseDMaaPMRPluginConfig.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import com.google.common.base.Objects; -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; - -import javax.annotation.Nullable; - -/** - * Base class for all DMaaP MR Configs - *

- * @author Rajiv Singla . Creation Date: 1/17/2017. - */ -public abstract class BaseDMaaPMRPluginConfig extends CDAPBasePluginConfig { - - @Description("DMaaP Message Router HostName") - @Macro - protected String hostName; - - @Description("DMaaP Message Router Host Port number. Defaults to Port 80") - @Nullable - @Macro - protected Integer portNumber; - - @Description("DMaaP Message Router Topic Name") - @Macro - protected String topicName; - - @Description("DMaaP Message Router HTTP Protocol e.g. HTTP or HTTPS. Defaults to HTTPS") - @Nullable - @Macro - protected String protocol; - - @Description("DMaaP Message Router User Name used for AAF Authentication. Defaults to no authentication") - @Nullable - @Macro - protected String userName; - - @Description("DMaaP Message Router User Password used for AAF Authentication. Defaults to no authentication") - @Nullable - @Macro - protected String userPassword; - - @Description("DMaaP Message Router Content Type. Defaults to 'application/json'") - @Nullable - @Macro - protected String contentType; - - - public BaseDMaaPMRPluginConfig(final String referenceName, final String hostName, final String topicName) { - this.referenceName = referenceName; - this.hostName = hostName; - this.topicName = topicName; - } - - /** - * Host Name for DMaaP MR Publisher or Subscriber - * - * @return host name - */ - public String getHostName() { - return hostName; - } - - /** - * Port Number for DMaaP MR Publisher or Subscriber - * - * @return port number - */ - @Nullable - public Integer getPortNumber() { - return portNumber; - } - - /** - * DMaaP MR Topic Name for Subscriber or Publisher - * - * @return topic name - */ - public String getTopicName() { - return topicName; - } - - - /** - * DMaaP MR HTTP or HTTPS protocol - * - * @return http or https protocol - */ - @Nullable - public String getProtocol() { - return protocol; - } - - /** - * User name used for DMaaP MR AAF Authentication - * - * @return User name for DMaaP MR AAF Authentication - */ - @Nullable - public String getUserName() { - return userName; - } - - /** - * User password used for DMaaP MR AAF Authentication - * - * @return User password used for DMaaP MR AAF Authentication - */ - @Nullable - public String getUserPassword() { - return userPassword; - } - - /** - * Content type used for DMaaP MR Topic e.g. 'application/json' - * - * @return content type for DMaaP MR Topic - */ - @Nullable - public String getContentType() { - return contentType; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("referenceName", referenceName) - .add("hostName", hostName) - .add("portNumber", portNumber) - .add("topicName", topicName) - .add("protocol", protocol) - .add("userName", userName) - .add("userPassword", "xxxx") - .add("contentType", contentType) - .toString(); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java deleted file mode 100644 index 454f384..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSinkPluginConfig.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import com.google.common.base.Objects; - -import javax.annotation.Nullable; - -/** - * DMaaP MR Publisher Config - *

- * @author Rajiv Singla . Creation Date: 1/17/2017. - */ -public class DMaaPMRSinkPluginConfig extends BaseDMaaPMRPluginConfig { - - private static final long serialVersionUID = 1L; - - @Description("Column name of input schema which contains the message that needs to be written to DMaaP MR Topic") - @Macro - protected String messageColumnName; - - @Description("DMaaP MR Publisher Max Batch Size. Defaults to no Batch") - @Nullable - @Macro - protected Integer maxBatchSize; - - @Description("DMaaP MR Publisher Recovery Queue Size. Default to 1000K messages which can be buffered in memory " + - "in case DMaaP MR Publisher is temporarily unavailable") - @Nullable - @Macro - protected Integer maxRecoveryQueueSize; - - // Required No Arg constructor - public DMaaPMRSinkPluginConfig() { - this(null, null, null, null); - } - - public DMaaPMRSinkPluginConfig(String referenceName, String hostName, String topicName, String messageColumnName) { - super(referenceName, hostName, topicName); - this.messageColumnName = messageColumnName; - } - - /** - * Column name of incoming Schema field that contains the message that needs to published to DMaaP MR Topic - * - * @return Column name of incoming schema which contains message that needs to published to DMaaP MR Topic - */ - public String getMessageColumnName() { - return messageColumnName; - } - - /** - * DMaaP MR Publisher Max Batch Size. - * - * @return DMaaP MR Publisher Max Batch Size - */ - @Nullable - public Integer getMaxBatchSize() { - return maxBatchSize; - } - - /** - * DMaaP MR Publisher Max Recovery Queue Size - * - * @return DMaaP MR Publisher Max Recovery Queue Size - */ - @Nullable - public Integer getMaxRecoveryQueueSize() { - return maxRecoveryQueueSize; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("super", super.toString()) - .add("messageColumnName", messageColumnName) - .add("maxBatchSize", maxBatchSize) - .add("maxRecoveryQueueSize", maxRecoveryQueueSize) - .toString(); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java deleted file mode 100644 index d3e966b..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/dmaap/DMaaPMRSourcePluginConfig.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import com.google.common.base.Objects; - -import javax.annotation.Nullable; - -/** - * DMaaP MR Subscriber Config - *

- * @author Rajiv Singla . Creation Date: 1/17/2017. - */ -public class DMaaPMRSourcePluginConfig extends BaseDMaaPMRPluginConfig { - - private static final long serialVersionUID = 1L; - - @Description("DMaaP MR Polling Interval in MS") - @Macro - protected Integer pollingInterval; - - @Description("DMaaP Message Router Subscriber Consumer ID. Defaults to some randomly created userID") - @Nullable - @Macro - protected String consumerId; - - @Description("DMaaP Message Router Subscriber Consumer Group. Defaults to some randomly created user Group") - @Nullable - @Macro - protected String consumerGroup; - - @Description("DMaaP Message Router Subscriber Timeout in MS. Defaults to no timeout") - @Nullable - @Macro - protected Integer timeoutMS; - - @Description("DMaaP Message Router Subscriber Message Limit. Defaults to no message limit") - @Nullable - @Macro - protected Integer messageLimit; - - // Required No Arg constructor - public DMaaPMRSourcePluginConfig() { - this(null, null, null, 0); - } - - public DMaaPMRSourcePluginConfig(String referenceName, String hostName, String topicName, Integer pollingInterval) { - super(referenceName, hostName, topicName); - this.pollingInterval = pollingInterval; - } - - /** - * DMaaP MR Subscriber Polling interval - * - * @return DMaaP MR Subscriber Polling interval - */ - public Integer getPollingInterval() { - return pollingInterval; - } - - /** - * DMaaP MR Subscriber Consumer ID - * - * @return DMaaP MR Subscriber Consumer ID - */ - @Nullable - public String getConsumerId() { - return consumerId; - } - - /** - * DMaaP MR Subscriber Consumer Group - * - * @return DMaaP MR Subscriber Consumer Group - */ - @Nullable - public String getConsumerGroup() { - return consumerGroup; - } - - /** - * DMaaP MR Subscriber Timeout in MS - * - * @return DMaaP MR Subscriber Timeout in MS - */ - @Nullable - public Integer getTimeoutMS() { - return timeoutMS; - } - - /** - * DMaaP MR Subscriber message limit - * - * @return DMaaP MR Subscriber Message limit - */ - @Nullable - public Integer getMessageLimit() { - return messageLimit; - } - - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("super", super.toString()) - .add("pollingInterval", pollingInterval) - .add("consumerId", consumerId) - .add("consumerGroup", consumerGroup) - .add("timeoutMS", timeoutMS) - .add("messageLimit", messageLimit) - .toString(); - } - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java deleted file mode 100644 index d8a224d..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/filter/JsonPathFilterPluginConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import co.cask.cdap.api.annotation.Name; -import com.google.common.base.Objects; -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; - -/** - * Configuration for Json Path Filter Plugin - * - * @author Rajiv Singla . Creation Date: 3/2/2017. - */ -public class JsonPathFilterPluginConfig extends CDAPBasePluginConfig { - - private static final long serialVersionUID = 1L; - - @Name("incomingJsonFieldName") - @Description("Input schema field name that contain JSON used for filtering") - @Macro - protected String incomingJsonFieldName; - - - @Name("outputSchemaFieldName") - @Description("Name of the nullable boolean schema field name that will contain result of the filter matching") - @Macro - protected String outputSchemaFieldName; - - - @Name("jsonFilterMappings") - @Macro - @Description("Filters incoming JSON based on given filter mappings - in terms of JSON path and expected values." + - "Right hand side contains JSON path. Left hand side contains semicolon (';') separated expected values " + - "for that JSON Path. If all provided JSON Path mappings and corresponding values matches - " + - "output schema field will be marked as true") - protected String jsonFilterMappings; - - - @Name("schema") - @Description("Output Schema") - protected String schema; - - - public JsonPathFilterPluginConfig(final String referenceName, final String incomingJsonFieldName, - final String outputSchemaFieldName, final String jsonFilterMappings, - final String schema) { - this.referenceName = referenceName; - this.incomingJsonFieldName = incomingJsonFieldName; - this.outputSchemaFieldName = outputSchemaFieldName; - this.jsonFilterMappings = jsonFilterMappings; - this.schema = schema; - } - - /** - * Provides incoming plugin schema field name which contains json used to apply filter - * - * @return name of incoming schema field containing JSON to be filtered - */ - public String getIncomingJsonFieldName() { - return incomingJsonFieldName; - } - - /** - * Provides plugin output schema filed name that will contain result of filter application - * It must be nullable and boolean type - * - * @return name of outgoing schema filed name that will contain filtering result - */ - public String getOutputSchemaFieldName() { - return outputSchemaFieldName; - } - - /** - * Provides JSON filter mappings. LHS contains JSON path value and RHS contains expected - * values separated by semicolon - * - * - * @return String for JSON filter mappings - */ - public String getJsonFilterMappings() { - return jsonFilterMappings; - } - - /** - * Output Schema - * - * @return output schema string - */ - public String getSchema() { - return schema; - } - - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("referenceName", referenceName) - .add("incomingJsonFieldName", incomingJsonFieldName) - .add("outputSchemaFieldName", outputSchemaFieldName) - .add("jsonFilterMappings", jsonFilterMappings) - .add("schema", schema) - .toString(); - } - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java deleted file mode 100644 index 4cdba6a..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/config/tca/SimpleTCAPluginConfig.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import com.google.common.base.Objects; -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBasePluginConfig; - -import javax.annotation.Nullable; - -/** - * Simple TCA Plugin Configuration - *

- * @author Rajiv Singla . Creation Date: 2/13/2017. - */ -public class SimpleTCAPluginConfig extends CDAPBasePluginConfig { - - private static final long serialVersionUID = 1L; - - @Description("Field name containing VES Message") - @Macro - protected String vesMessageFieldName; - - @Description("Policy JSON that need to be applied to VES Message") - @Macro - protected String policyJson; - - @Description("Name of the output field that will contain the alert") - @Macro - protected String alertFieldName; - - @Description("Name of the output field that will contain message type: INAPPLICABLE, COMPLIANT, NON_COMPLIANT") - @Macro - protected String messageTypeFieldName; - - @Description("Specifies the output schema") - protected String schema; - - @Description("Enables") - @Nullable - @Macro - protected Boolean enableAlertCEFFormat; - - - /** - * Creates an instance of TCA Plugin Configs - * - * @param vesMessageFieldName Ves message field name from incoming plugin schema - * @param policyJson TCA Policy Json String - * @param alertFieldName Alert field name that will be added in TCA plugin output schema - * @param messageTypeFieldName Message type field name that will be added in TCA plugin output schema - * @param schema TCA Plugin output schema - * @param enableAlertCEFFormat enables alert message to be formatted in VES format - */ - public SimpleTCAPluginConfig(final String vesMessageFieldName, final String policyJson, - final String alertFieldName, final String messageTypeFieldName, - final String schema, final Boolean enableAlertCEFFormat) { - this.vesMessageFieldName = vesMessageFieldName; - this.policyJson = policyJson; - this.alertFieldName = alertFieldName; - this.messageTypeFieldName = messageTypeFieldName; - this.schema = schema; - this.enableAlertCEFFormat = enableAlertCEFFormat; - } - - /** - * Name of the field containing VES Message - * - * @return VES Message field name - */ - public String getVesMessageFieldName() { - return vesMessageFieldName; - } - - /** - * Policy Json String - * - * @return Policy Json String - */ - public String getPolicyJson() { - return policyJson; - } - - - /** - * Alert Field name in outgoing schema - * - * @return alert field name in outgoing schema - */ - public String getAlertFieldName() { - return alertFieldName; - } - - /** - * Returns output schema string - * - * @return output schema string - */ - public String getSchema() { - return schema; - } - - /** - * Return TCA message type - INAPPLICABLE, COMPLIANT, NON_COMPLIANT - * - * @return tca message type - */ - public String getMessageTypeFieldName() { - return messageTypeFieldName; - } - - - /** - * Returns if Alert output in Common Event format - * - * @return true if alert output is in common event format - */ - @Nullable - public Boolean getEnableAlertCEFFormat() { - return enableAlertCEFFormat; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("referenceName", referenceName) - .add("vesMessageFieldName", vesMessageFieldName) - .add("policyJson", policyJson) - .add("alertFieldName", alertFieldName) - .add("messageTypeFieldName", messageTypeFieldName) - .add("schema", schema) - .add("enableAlertCEFFormat", true) - .toString(); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java deleted file mode 100644 index a3234c0..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/domain/schema/dmaap/DMaaPSourceOutputSchema.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap; - -import co.cask.cdap.api.data.schema.Schema; -import org.openecomp.dcae.apod.analytics.cdap.plugins.common.PluginSchema; - -/** - * Output Schema for DMaaP MR Source Plugin - * - * @author Rajiv Singla . Creation Date: 1/25/2017. - */ -public enum DMaaPSourceOutputSchema implements PluginSchema { - - TIMESTAMP("ts"), - RESPONSE_CODE("responseCode"), - RESPONSE_MESSAGE("responseMessage"), - FETCHED_MESSAGE("message"); - - private String schemaColumnName; - - DMaaPSourceOutputSchema(String schemaColumnName) { - this.schemaColumnName = schemaColumnName; - } - - @Override - public String getSchemaColumnName() { - return schemaColumnName; - } - - public static Schema getSchema() { - return Schema.recordOf( - "DMaaPMRSourcePluginResponse", - Schema.Field.of(TIMESTAMP.getSchemaColumnName(), Schema.of(Schema.Type.LONG)), - Schema.Field.of(RESPONSE_CODE.getSchemaColumnName(), Schema.of(Schema.Type.INT)), - Schema.Field.of(RESPONSE_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING)), - Schema.Field.of(FETCHED_MESSAGE.getSchemaColumnName(), Schema.of(Schema.Type.STRING)) - ); - } - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java deleted file mode 100644 index fb5fef5..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/sparkcompute/tca/SimpleTCAPlugin.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.data.format.StructuredRecord.Builder; -import co.cask.cdap.api.data.schema.Schema; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.StageMetrics; -import co.cask.cdap.etl.api.batch.SparkCompute; -import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.SimpleTCAPluginConfigValidator; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Rajiv Singla . Creation Date: 2/13/2017. - */ - -@Plugin(type = SparkCompute.PLUGIN_TYPE) -@Name("SimpleTCAPlugin") -@Description("Used to create TCA (Threshold Crossing Alert) based on given Policy") -@SuppressFBWarnings("SE_INNER_CLASS") -public class SimpleTCAPlugin extends SparkCompute { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPlugin.class); - private static final long serialVersionUID = 1L; - - private final SimpleTCAPluginConfig pluginConfig; - - /** - * Create an instance of Simple TCA Plugin with give Simple TCA Plugin Config - * - * @param pluginConfig Simple TCA Plugin Config - */ - public SimpleTCAPlugin(SimpleTCAPluginConfig pluginConfig) { - this.pluginConfig = pluginConfig; - LOG.info("Creating instance of Simple TCA Plugin with plugin config: {}", pluginConfig); - } - - @Override - public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - super.configurePipeline(pipelineConfigurer); - ValidationUtils.validateSettings(pluginConfig, new SimpleTCAPluginConfigValidator()); - final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); - CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getVesMessageFieldName()); - CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema()); - } - - @Override - public JavaRDD transform(final SparkExecutionPluginContext context, - final JavaRDD input) throws Exception { - final StageMetrics metrics = context.getMetrics(); - - LOG.debug("Invoking Spark Transform for Simple TCA Plugin"); - return input.map(new Function() { - - @Override - public StructuredRecord call(StructuredRecord inputStructuredRecord) throws Exception { - TCACalculatorMessageType calculatorMessageType; - String alertMessage = null; - - // Get input structured record - final String cefMessage = inputStructuredRecord.get(pluginConfig.getVesMessageFieldName()); - - // Get TCA Policy - final TCAPolicy tcaPolicy = CDAPPluginUtils.readValue(pluginConfig.getPolicyJson(), TCAPolicy.class); - - // create initial processor context - final TCACEFProcessorContext initialProcessorContext = - new TCACEFProcessorContext(cefMessage, tcaPolicy); - - final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor(); - final TCACEFProcessorContext jsonProcessorContext = - jsonProcessor.processMessage(initialProcessorContext); - - if (jsonProcessorContext.getCEFEventListener() != null) { - - LOG.debug("Json to CEF parsing successful. Parsed object {}", - jsonProcessorContext.getCEFEventListener()); - - // compute violations - final TCACEFProcessorContext processorContextWithViolations = - TCAUtils.computeThresholdViolations(jsonProcessorContext); - - // if violation are found then create alert message - if (processorContextWithViolations.canProcessingContinue()) { - - alertMessage = TCAUtils.createTCAAlertString(processorContextWithViolations, - pluginConfig.getReferenceName(), pluginConfig.getEnableAlertCEFFormat()); - calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; - - LOG.debug("VES Threshold Violation Detected.An alert message is be generated: {}", - alertMessage); - - final MetricsPerEventName metricsPerEventName = - processorContextWithViolations.getMetricsPerEventName(); - if (metricsPerEventName != null - && metricsPerEventName.getThresholds() != null - && metricsPerEventName.getThresholds().get(0) != null) { - final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); - LOG.debug("CEF Message: {}, Violated Threshold: {}", cefMessage, violatedThreshold); - } - - metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); - - } else { - LOG.debug("No Threshold Violation Detected. No alert will be generated."); - calculatorMessageType = TCACalculatorMessageType.COMPLIANT; - metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); - } - - } else { - LOG.info("Unable to parse provided json message to CEF format. Invalid message: {}", cefMessage); - calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; - } - - LOG.debug("Calculator message type: {} for message: {}", calculatorMessageType, cefMessage); - - final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema()); - - // create new output record builder and copy any input record values to output record builder - final Builder outputRecordBuilder = - CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord); - - // add alert field - final Builder outputRecordBuilderWithAlertField = - CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder, - outputSchema, pluginConfig.getAlertFieldName(), alertMessage); - - // add message field type - final Builder outRecordBuilderWithMessageTypeField = - CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilderWithAlertField, - outputSchema, pluginConfig.getMessageTypeFieldName(), calculatorMessageType.toString()); - - return outRecordBuilderWithMessageTypeField.build(); - } - }); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java deleted file mode 100644 index 9822768..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; - -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.metrics.Metrics; -import com.google.common.base.Optional; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.receiver.Receiver; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records - *

- * @author Rajiv Singla . Creation Date: 1/19/2017. - */ -public class DMaaPMRReceiver extends Receiver { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRReceiver.class); - private static final long serialVersionUID = 1L; - - private final DMaaPMRSourcePluginConfig pluginConfig; - private final transient Metrics metrics; - - public DMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig, - final Metrics metrics) { - super(storageLevel); - this.pluginConfig = pluginConfig; - this.metrics = metrics; - LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig); - } - - @Override - public void onStart() { - - // create DMaaP MR Subscriber - final DMaaPMRSubscriber subscriber = - DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); - - // Start a new thread with indefinite loop until receiver is stopped - new Thread() { - @Override - public void run() { - while (!isStopped()) { - storeStructuredRecords(subscriber); - try { - final Integer pollingInterval = pluginConfig.getPollingInterval(); - LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); - TimeUnit.MILLISECONDS.sleep(pollingInterval); - } catch (InterruptedException e) { - final String errorMessage = String.format( - "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - } - }.start(); - - } - - @Override - public void onStop() { - LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig); - } - - /** - * Fetches records from DMaaP MR Subscriber and store them as structured records - * - * @param subscriber DMaaP MR Subscriber Instance - */ - public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) { - - LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic"); - - // Fetch messages from DMaaP MR Topic - final Optional> subscriberMessagesOptional = - DMaaPMRUtils.getSubscriberMessages(subscriber, metrics); - - // store records - if (subscriberMessagesOptional.isPresent()) { - final List messages = subscriberMessagesOptional.get(); - for (final String message : messages) { - store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(message)); - } - LOG.debug("Stored DMaaP Subscriber messages as Structured Records. Message count {}", messages.size()); - } - } - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java deleted file mode 100644 index 117c76e..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRSource.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.streaming.StreamingContext; -import co.cask.cdap.etl.api.streaming.StreamingSource; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema; -import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSourcePluginConfigValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * DMaaP MR Source Plugin which polls DMaaP MR topic at frequent intervals - *

- * @author Rajiv Singla . Creation Date: 1/18/2017. - */ -@Plugin(type = StreamingSource.PLUGIN_TYPE) -@Name("DMaaPMRSource") -@Description("Fetches DMaaP MR Messages at regular intervals") -public class DMaaPMRSource extends StreamingSource { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSource.class); - private static final long serialVersionUID = 1L; - - private final DMaaPMRSourcePluginConfig pluginConfig; - - public DMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) { - LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig); - this.pluginConfig = pluginConfig; - } - - @Override - public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSourcePluginConfigValidator()); - pipelineConfigurer.getStageConfigurer().setOutputSchema(DMaaPSourceOutputSchema.getSchema()); - } - - @Override - public JavaDStream getStream(final StreamingContext streamingContext) throws Exception { - return streamingContext.getSparkStreamingContext().receiverStream( - new DMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig, streamingContext.getMetrics())); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java deleted file mode 100644 index dc24ca0..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRReceiver.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; - -import co.cask.cdap.api.data.format.StructuredRecord; -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.receiver.Receiver; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue; -import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString; - -/** - * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records - *

- * @author Rajiv Singla . Creation Date: 1/19/2017. - */ -public class MockDMaaPMRReceiver extends Receiver { - - private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class); - private static final long serialVersionUID = 1L; - - private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json"; - private static final TypeReference> EVENT_LISTENER_TYPE_REFERENCE = - new TypeReference>() { - }; - - private final DMaaPMRSourcePluginConfig pluginConfig; - - public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) { - super(storageLevel); - this.pluginConfig = pluginConfig; - LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig); - } - - @Override - public void onStart() { - - // create DMaaP MR Subscriber - final DMaaPMRSubscriber subscriber = - DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); - storeStructuredRecords(subscriber); - - } - - @Override - public void onStop() { - LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig); - } - - /** - * Fetches records from DMaaP MR Subscriber and store them as structured records - * - * @param subscriber DMaaP MR Subscriber Instance - */ - public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) { - - LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic"); - - try (InputStream resourceAsStream = - Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) { - - if (resourceAsStream == null) { - LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION); - throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException()); - } - - List eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE); - - final int totalMessageCount = eventListeners.size(); - LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount); - - int i = 1; - for (EventListener eventListener : eventListeners) { - if (isStopped()) { - return; - } - final String eventListenerString = writeValueAsString(eventListener); - LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount); - store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString)); - i++; - try { - TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval()); - } catch (InterruptedException e) { - LOG.error("Error while sleeping"); - throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e); - } - - } - - LOG.debug("Finished writing mock messages to CDAP Stream"); - - } catch (IOException e) { - LOG.error("Error while parsing json file"); - throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e); - } - } - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java deleted file mode 100644 index e058fab..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/streaming/dmaap/MockDMaaPMRSource.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.streaming.StreamingContext; -import co.cask.cdap.etl.api.streaming.StreamingSource; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A mock implementation of DMaaP MR Receiver which sends mock ves messages - *

- * @author Rajiv Singla . Creation Date: 2/15/2017. - */ -@Plugin(type = StreamingSource.PLUGIN_TYPE) -@Name("MockDMaaPMRSource") -@Description("Fetches DMaaP MR Messages at regular intervals") -public class MockDMaaPMRSource extends StreamingSource { - - private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRSource.class); - private static final long serialVersionUID = 1L; - - private final DMaaPMRSourcePluginConfig pluginConfig; - - public MockDMaaPMRSource(final DMaaPMRSourcePluginConfig pluginConfig) { - LOG.debug("Creating DMaaP MR Source plugin with plugin Config: {}", pluginConfig); - this.pluginConfig = pluginConfig; - } - - @Override - public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - final Integer pollingInterval = pluginConfig.getPollingInterval(); - if (pollingInterval == null) { - final String errorMessage = "Polling Interval field must be present"; - throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } else { - LOG.info("Mock Message will be send every ms: {}", pollingInterval); - } - } - - @Override - public JavaDStream getStream(final StreamingContext streamingContext) throws Exception { - return streamingContext.getSparkStreamingContext().receiverStream( - new MockDMaaPMRReceiver(StorageLevel.MEMORY_ONLY(), pluginConfig)); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java deleted file mode 100644 index 135a6c2..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/transform/filter/JsonPathFilter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.transform.filter; - -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.data.schema.Schema; -import co.cask.cdap.etl.api.Emitter; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.Transform; -import co.cask.cdap.etl.api.TransformContext; -import com.google.common.base.Splitter; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.JsonPathFilterPluginConfigValidator; -import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext; -import org.openecomp.dcae.apod.analytics.common.utils.MessageProcessorUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Set; - -/** - * Json Path filter Plugin filters incoming schema field based of given json path expected values - *

- * @author Rajiv Singla . Creation Date: 3/2/2017. - */ - -@Plugin(type = Transform.PLUGIN_TYPE) -@Name("JsonPathFilter") -@Description("Filters incoming schema field based of given json path expected values") -public class JsonPathFilter extends Transform { - - private static final Logger LOG = LoggerFactory.getLogger(JsonPathFilter.class); - - private final JsonPathFilterPluginConfig pluginConfig; - private final Map> jsonFilterPathMappings; - - public JsonPathFilter(final JsonPathFilterPluginConfig pluginConfig) { - this.pluginConfig = pluginConfig; - jsonFilterPathMappings = Maps.newHashMap(); - LOG.info("Created instance of Json Path Filter Plugin with plugin config: {}", pluginConfig); - } - - - @Override - public void initialize(final TransformContext context) throws Exception { - super.initialize(context); - populateJsonFilterMapping(); - } - - @Override - public void configurePipeline(final PipelineConfigurer pipelineConfigurer) { - super.configurePipeline(pipelineConfigurer); - ValidationUtils.validateSettings(pluginConfig, new JsonPathFilterPluginConfigValidator()); - final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); - CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getIncomingJsonFieldName()); - populateJsonFilterMapping(); - CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema()); - } - - @Override - public void transform(final StructuredRecord inputStructuredRecord, final Emitter emitter) - throws Exception { - - // get input json message - final String jsonMessage = inputStructuredRecord.get(pluginConfig.getIncomingJsonFieldName()); - - // process Json Filter Mappings - final JsonMessageFilterProcessorContext jsonMessageFilterProcessorContext = - MessageProcessorUtils.processJsonFilterMappings(jsonMessage, jsonFilterPathMappings); - - // create new output record builder and copy any input Structured record values to output record builder - final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema()); - final StructuredRecord.Builder outputRecordBuilder = - CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord); - - // add json filter matched field - final StructuredRecord.Builder outputRecordBuilderWithMatchedField = - CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder, - outputSchema, pluginConfig.getOutputSchemaFieldName(), - jsonMessageFilterProcessorContext.getMatched()); - - // emit structured record with filtering matched field - final StructuredRecord outputStructuredRecord = outputRecordBuilderWithMatchedField.build(); - - LOG.debug("Incoming Json Message: {}.Json Path Filter Output Matched Field: {}", jsonMessage, - outputStructuredRecord.get(pluginConfig.getOutputSchemaFieldName())); - - emitter.emit(outputStructuredRecord); - - } - - /** - * Populates Json Filter Mapping - */ - private void populateJsonFilterMapping() { - final Map fieldMappings = - CDAPPluginUtils.extractFieldMappings(pluginConfig.getJsonFilterMappings()); - if (fieldMappings.isEmpty()) { - throw new IllegalArgumentException("No Field Mapping found. Invalid Filter mapping configuration"); - } - final Splitter semiColonSplitter = Splitter.on(";"); - for (Map.Entry fieldMappingEntry : fieldMappings.entrySet()) { - jsonFilterPathMappings.put(fieldMappingEntry.getKey(), - Sets.newLinkedHashSet(semiColonSplitter.split(fieldMappingEntry.getValue()))); - } - LOG.info("Input Json Filter Mappings: {}", jsonFilterPathMappings); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java deleted file mode 100644 index 3ae1560..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; - -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.data.schema.Schema; -import co.cask.cdap.etl.api.PipelineConfigurer; -import com.google.common.base.Function; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.StringUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * @author Rajiv Singla . Creation Date: 1/26/2017. - */ -public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils { - - private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class); - - public static final Function SCHEMA_TO_TYPE_FUNCTION = new Function() { - @Override - public Schema.Type apply(@Nonnull Schema schema) { - return schema.getType(); - } - }; - - - - private CDAPPluginUtils() { - // private constructor - } - - /** - * Validates if CDAP Schema contains expected fields - * - * @param schema schema that need to be validated - * @param expectedFields fields that are expected to be in the schema - */ - - public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) { - - LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields)); - - if (schema == null) { - // If input schema is null then no validation possible - LOG.warn("Input Schema is null. No validation possible"); - } else { - // Check if expected fields are indeed present in the schema - for (String expectedField : expectedFields) { - final Schema.Field schemaField = schema.getField(expectedField); - if (schemaField == null) { - final String errorMessage = String.format( - "Unable to find expected field: %s, in schema: %s", expectedField, schema); - throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - } - LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema, - Arrays.toString(expectedFields)); - } - } - - - /** - * Creates a new Structured Record containing DMaaP MR fetched message - * - * @param message DMaaP MR fetch message - * - * @return Structured record containing DMaaP MR Message - */ - public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) { - StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema()); - recordBuilder - .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime()) - .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200) - .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK") - .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message); - return recordBuilder.build(); - } - - - /** - * Creates output StructuredRecord Builder which has copied values from input StructuredRecord - * - * @param outputSchema output Schema - * @param inputStructuredRecord input Structured Record - * - * @return output Structured Record builder with pre populated values from input structured record - */ - public static StructuredRecord.Builder createOutputStructuredRecordBuilder( - @Nonnull final Schema outputSchema, - @Nonnull final StructuredRecord inputStructuredRecord) { - - // Get input structured Record Schema - final Schema inputSchema = inputStructuredRecord.getSchema(); - // Create new instance of output Structured Record Builder from output Schema - final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema); - - // iterate over input fields and if output schema has field with same name copy the value to out record builder - for (Schema.Field inputField : inputSchema.getFields()) { - final String inputFieldName = inputField.getName(); - if (outputSchema.getField(inputFieldName) != null) { - outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName)); - } - } - - return outputStructuredRecordBuilder; - } - - - /** - * Adds Field value to StructuredRecord Builder if schema contains that field Name - * - * @param structuredRecordBuilder structured record builder - * @param structuredRecordSchema schema for structured record builder - * @param fieldName field name - * @param fieldValue field value - * - * @return structured record builder with populated field name and value if schema contains field name - */ - public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder( - @Nonnull final StructuredRecord.Builder structuredRecordBuilder, - @Nonnull final Schema structuredRecordSchema, - @Nonnull final String fieldName, - final Object fieldValue) { - - // check if schema contains field Name - if (structuredRecordSchema.getField(fieldName) != null) { - structuredRecordBuilder.set(fieldName, fieldValue); - } else { - LOG.info("Unable to populate value for field Name: {} with field value: {}. " + - "Schema Fields: {} does not contain field name: {}", - fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName); - } - - return structuredRecordBuilder; - } - - - /** - * Validates that given schema String has fieldName of expected type. If field does not exist in given schema - * then validation will pass with warning. If field does exist in given schema then this validation will return - * true if field type is same as expected type else false - * - * @param schemaString CDAP Plugin output or input schema string - * @param fieldName field name - * @param expectedFieldType expected schema field type - * - * @return true if field type matches expected field type else false. If field does not exist in - * give schema validation will pass but will generate a warning message - */ - public static boolean validateSchemaFieldType(@Nonnull final String schemaString, - @Nonnull final String fieldName, - @Nonnull final Schema.Type expectedFieldType) { - - try { - // parse given schema String - final Schema outputSchema = Schema.parseJson(schemaString); - final Schema.Field schemaField = outputSchema.getField(fieldName); - - // if given schema does contain field then validated fieldName type - if (schemaField != null) { - - final List schemas = new LinkedList<>(); - - // if it is a union type then grab all union schemas - if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) { - final List unionFieldSchemas = - outputSchema.getField(fieldName).getSchema().getUnionSchemas(); - schemas.addAll(unionFieldSchemas); - } else { - // if not union type the just get the field schema - final Schema fieldSchema = outputSchema.getField(fieldName).getSchema(); - schemas.add(fieldSchema); - } - - // get all schema types - final List fieldTypes = - Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION); - - // if all schema types does not contain expected field type then return false - if (!fieldTypes.contains(expectedFieldType)) { - LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}", - fieldName, expectedFieldType, outputSchema); - return false; - } - - // field type validation passed - LOG.debug("Successfully validated fieldName: {} is of expected Type: {}", - fieldName, expectedFieldType); - - return true; - - } else { - - // if field does not exist then the validation will pass but will generate warning message - LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}", - fieldName, outputSchema); - return true; - } - - } catch (IOException e) { - final String errorMessage = - String.format("Unable to parse schema: %s for field type validation. " + - "Field Name: %s, Expected Field Type: %s Exception: %s", - schemaString, fieldName, expectedFieldType, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - } - - - /** - * Parses provided schema String as Schema object and set it as output Schema format - * - * @param pipelineConfigurer plugin pipeline configurer - * @param schemaString schema String to be set as output schema - */ - public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) { - try { - final Schema outputSchema = Schema.parseJson(schemaString); - pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema); - } catch (IOException e) { - final String errorMessage = String.format( - "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e); - throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - } - - - /** - * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument - * exception will be thrown - * - * @param mappingFieldString field Mapping String - * - * @return map containing mapping key values - */ - public static Map extractFieldMappings(final String mappingFieldString) { - final Map fieldMappings = Maps.newHashMap(); - if (StringUtils.isNotBlank(mappingFieldString)) { - final Splitter commaSplitter = Splitter.on(","); - for (String fieldMapping : commaSplitter.split(mappingFieldString)) { - final String[] keyValueMappings = fieldMapping.split(":"); - if (keyValueMappings.length != 2 || - StringUtils.isBlank(keyValueMappings[0]) || - StringUtils.isBlank(keyValueMappings[1])) { - final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " + - "be present in mappings: " + mappingFieldString; - throw new DCAEAnalyticsRuntimeException( - errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim()); - } - } - return fieldMappings; - } - - - - -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java deleted file mode 100644 index 01dad7e..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; - -import com.google.common.base.Function; -import org.apache.hadoop.conf.Configuration; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; - -import javax.annotation.Nonnull; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; - -/** - * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig} - *

- * @author Rajiv Singla . Creation Date: 1/26/2017. - */ -public class DMaaPSinkConfigMapper implements Function { - - /** - * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig} - * - * @param sinkPluginConfig DMaaP Sink Plugin Config - * - * @return DMaaP MR Publisher Config - */ - public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) { - return new DMaaPSinkConfigMapper().apply(sinkPluginConfig); - } - - /** - * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig} - * - * @param configuration Hadoop Configuration containing DMaaP MR Sink field values - * - * @return DMaaP MR Publisher Config - */ - @Nonnull - @Override - public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) { - - // Create a new publisher settings builder - final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME); - final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME); - - if (isEmpty(hostName) || isEmpty(topicName)) { - throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present"); - } - - final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = - new DMaaPMRPublisherConfig.Builder(hostName, topicName); - - // Setup up any optional publisher parameters if they are present - final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER); - if (portNumber != null) { - publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber)); - } - - final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL); - if (isPresent(protocol)) { - publisherConfigBuilder.setProtocol(protocol); - } - - final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME); - if (isPresent(userName)) { - publisherConfigBuilder.setUserName(userName); - } - - final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS); - if (isPresent(userPassword)) { - publisherConfigBuilder.setUserPassword(userPassword); - } - - final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE); - if (isPresent(contentType)) { - publisherConfigBuilder.setContentType(contentType); - } - - final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE); - if (maxBatchSize != null) { - publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize)); - } - - final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE); - if (maxRecoveryQueueSize != null) { - publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize)); - } - - return publisherConfigBuilder.build(); - - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java deleted file mode 100644 index 2ae09be..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.utils; - -import com.google.common.base.Function; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; - -import javax.annotation.Nonnull; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; - -/** - * Function that converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} - *

- * @author Rajiv Singla . Creation Date: 1/18/2017. - */ -public class DMaaPSourceConfigMapper implements Function { - - /** - * Static factory method to map {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} - * - * @param pluginConfig DMaaP MR Souce Plugin Config - * - * @return DMaaP MR Subscriber Config - */ - public static DMaaPMRSubscriberConfig map(final DMaaPMRSourcePluginConfig pluginConfig) { - return new DMaaPSourceConfigMapper().apply(pluginConfig); - } - - /** - * Converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} object - * - * @param sourcePluginConfig DMaaP MR Source Plugin Config - * - * @return DMaaP MR Subscriber Config - */ - @Nonnull - @Override - public DMaaPMRSubscriberConfig apply(@Nonnull DMaaPMRSourcePluginConfig sourcePluginConfig) { - - // Create a new subscriber settings builder - final String hostName = sourcePluginConfig.getHostName(); - final String topicName = sourcePluginConfig.getTopicName(); - if (isEmpty(hostName) || isEmpty(topicName)) { - throw new IllegalStateException("DMaaP MR Source Host Name and Topic Name must be present"); - } - final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder( - hostName, topicName); - - // Setup up any optional subscriber parameters if they are present - final Integer subscriberHostPortNumber = sourcePluginConfig.getPortNumber(); - if (subscriberHostPortNumber != null) { - subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber); - } - - final String subscriberProtocol = sourcePluginConfig.getProtocol(); - if (isPresent(subscriberProtocol)) { - subscriberConfigBuilder.setProtocol(subscriberProtocol); - } - - final String subscriberUserName = sourcePluginConfig.getUserName(); - if (isPresent(subscriberUserName)) { - subscriberConfigBuilder.setUserName(subscriberUserName); - } - - final String subscriberUserPassword = sourcePluginConfig.getUserPassword(); - if (isPresent(subscriberUserPassword)) { - subscriberConfigBuilder.setUserPassword(subscriberUserPassword); - } - - final String subscriberContentType = sourcePluginConfig.getContentType(); - if (isPresent(subscriberContentType)) { - subscriberConfigBuilder.setContentType(subscriberContentType); - } - - final String subscriberConsumerId = sourcePluginConfig.getConsumerId(); - if (isPresent(subscriberConsumerId)) { - subscriberConfigBuilder.setConsumerId(subscriberConsumerId); - } - - final String subscriberConsumerGroup = sourcePluginConfig.getConsumerGroup(); - if (isPresent(subscriberConsumerGroup)) { - subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup); - } - - final Integer subscriberTimeoutMS = sourcePluginConfig.getTimeoutMS(); - if (subscriberTimeoutMS != null) { - subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS); - } - final Integer subscriberMessageLimit = sourcePluginConfig.getMessageLimit(); - if (subscriberMessageLimit != null) { - subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit); - } - - // return Subscriber config - return subscriberConfigBuilder.build(); - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java deleted file mode 100644 index 8cc818f..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/BaseDMaaPMRPluginConfigValidator.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; - -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.BaseDMaaPMRPluginConfig; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -/** - * Validates plugin config values which are common in DMaaP MR Configs - {@link BaseDMaaPMRPluginConfig} - *

- * @author Rajiv Singla . Creation Date: 1/23/2017. - * - * @param {@link BaseDMaaPMRPluginConfig} Sub classes - */ -public abstract class BaseDMaaPMRPluginConfigValidator implements - CDAPAppSettingsValidator> { - - private static final long serialVersionUID = 1L; - - /** - * Validates the {@link BaseDMaaPMRPluginConfig} parameters - * - * @param baseDMaaPMRPluginConfig DMaaP MR Plugin Config - * - * @return Validation Response containing validation errors if any - */ - @Override - public GenericValidationResponse validateAppSettings(final T baseDMaaPMRPluginConfig) { - - final GenericValidationResponse validationResponse = new GenericValidationResponse<>(); - - if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getHostName())) { - validationResponse.addErrorMessage( - "hostName", - "DMaaPMRPluginConfig - hostname field is undefined: " + baseDMaaPMRPluginConfig); - } - - if (baseDMaaPMRPluginConfig.getPortNumber() == null) { - validationResponse.addErrorMessage( - "port Number", - "DMaaPMRPluginConfig - host port number field is undefined: " + baseDMaaPMRPluginConfig); - } - - if (ValidationUtils.isEmpty(baseDMaaPMRPluginConfig.getTopicName())) { - validationResponse.addErrorMessage( - "topic Name", - "DMaaPMRSourcePluginConfig - topic name field is undefined: " + baseDMaaPMRPluginConfig); - } - - return validationResponse; - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java deleted file mode 100644 index c9b1df6..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSinkPluginConfigValidator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; - -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -/** - * Validates plugin config values in {@link DMaaPMRSinkPluginConfig} - *

- * @author Rajiv Singla . Creation Date: 1/30/2017. - */ -public class DMaaPMRSinkPluginConfigValidator extends BaseDMaaPMRPluginConfigValidator { - - private static final long serialVersionUID = 1L; - - /** - * Validates plugin config values in {@link DMaaPMRSinkPluginConfig} - * - * @param sinkPluginConfig Sink Plugin Config - * - * @return Validation response containing validation errors if any - */ - @Override - public GenericValidationResponse validateAppSettings( - final DMaaPMRSinkPluginConfig sinkPluginConfig) { - - // validate settings in BaseDMaaPMRPluginConfig - final GenericValidationResponse validationResponse = - super.validateAppSettings(sinkPluginConfig); - - if (ValidationUtils.isEmpty(sinkPluginConfig.getMessageColumnName())) { - validationResponse.addErrorMessage("messageColumn Name", - "DMaaPMRSinkPluginConfig - message column name field is undefined: " + sinkPluginConfig); - } - - return validationResponse; - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java deleted file mode 100644 index 15a7583..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/DMaaPMRSourcePluginConfigValidator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; - -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -/** - * Validates plugin config values in {@link DMaaPMRSourcePluginConfig} - *

- * @author Rajiv Singla . Creation Date: 1/30/2017. - */ -public class DMaaPMRSourcePluginConfigValidator extends BaseDMaaPMRPluginConfigValidator { - - private static final long serialVersionUID = 1L; - - /** - * Validates plugin config values in {@link DMaaPMRSourcePluginConfig} - * - * @param sourcePluginConfig Source Plugin Config - * - * @return Validation response containing validation errors if any - */ - @Override - public GenericValidationResponse validateAppSettings( - final DMaaPMRSourcePluginConfig sourcePluginConfig) { - - // validate settings in BaseDMaaPMRPluginConfig - final GenericValidationResponse validationResponse = - super.validateAppSettings(sourcePluginConfig); - - if (sourcePluginConfig.getPollingInterval() == null) { - validationResponse.addErrorMessage( - "port Number", - "DMaaPMRSourcePluginConfig - polling interval is undefined: " + sourcePluginConfig); - } - - return validationResponse; - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java deleted file mode 100644 index 428fedb..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/JsonPathFilterPluginConfigValidator.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; - -import co.cask.cdap.api.data.schema.Schema; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -/** - * Validator to validate {@link JsonPathFilterPluginConfig} - *

- * @author Rajiv Singla . Creation Date: 3/2/2017. - */ -public class JsonPathFilterPluginConfigValidator implements CDAPAppSettingsValidator> { - - private static final long serialVersionUID = 1L; - - @Override - public GenericValidationResponse validateAppSettings( - final JsonPathFilterPluginConfig jsonPathFilterPluginConfig) { - - final GenericValidationResponse validationResponse = - new GenericValidationResponse<>(); - - final String jsonFilterMappings = jsonPathFilterPluginConfig.getJsonFilterMappings(); - if (ValidationUtils.isEmpty(jsonFilterMappings)) { - - validationResponse.addErrorMessage("JsonFilterMappings", "Json Filter Mappings must be present"); - } - - - final String matchedField = jsonPathFilterPluginConfig.getOutputSchemaFieldName(); - final String outputSchemaJson = jsonPathFilterPluginConfig.getSchema(); - - if (ValidationUtils.isEmpty(outputSchemaJson)) { - - validationResponse.addErrorMessage("output schema", "Output schema is not present"); - - } else { - - // validate matched output field type is boolean - if (matchedField != null && - !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.BOOLEAN)) { - validationResponse.addErrorMessage("OutputSchemaFieldName", - String.format( - "OutputSchemaFieldName: %s must be marked as boolean type", matchedField)); - } - - // validate matched output field type is nullable - if (matchedField != null && - !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, matchedField, Schema.Type.NULL)) { - validationResponse.addErrorMessage("OutputSchemaFieldName", - String.format( - "OutputSchemaFieldName: %s must be marked as nullable type", matchedField)); - } - - } - - return validationResponse; - } -} diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java deleted file mode 100644 index 97f3f24..0000000 --- a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/validator/SimpleTCAPluginConfigValidator.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.plugins.validator; - -import co.cask.cdap.api.data.schema.Schema; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig; -import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -/** - * Validator that validate {@link SimpleTCAPluginConfig} - *

- * @author Rajiv Singla . Creation Date: 2/21/2017. - */ -public class SimpleTCAPluginConfigValidator implements CDAPAppSettingsValidator> { - - private static final long serialVersionUID = 1L; - - @Override - public GenericValidationResponse validateAppSettings( - final SimpleTCAPluginConfig tcaPluginConfig) { - - final GenericValidationResponse validationResponse = new GenericValidationResponse<>(); - - if (ValidationUtils.isEmpty(tcaPluginConfig.getVesMessageFieldName())) { - validationResponse.addErrorMessage("vesMessageFieldName", - "Missing VES Message Field Name from plugin incoming schema"); - } - - if (ValidationUtils.isEmpty(tcaPluginConfig.getPolicyJson())) { - validationResponse.addErrorMessage("policyJson", - "Missing tca Policy Json"); - } - - final String alertFieldValue = tcaPluginConfig.getAlertFieldName(); - final String alertFieldName = "alertFieldName"; - if (ValidationUtils.isEmpty(alertFieldValue)) { - validationResponse.addErrorMessage(alertFieldName, - "Missing alert Field Name that will be placed in plugin outgoing schema"); - } - - if (ValidationUtils.isEmpty(tcaPluginConfig.getMessageTypeFieldName())) { - validationResponse.addErrorMessage("messageTypeField", - "Missing message Type Field Name that will be placed in plugin outgoing schema"); - } - - - final String outputSchemaJson = tcaPluginConfig.getSchema(); - if (ValidationUtils.isEmpty(outputSchemaJson)) { - validationResponse.addErrorMessage("output schema", "Output schema is not present"); - } else { - // validate output schema - alert field name is of type string - if (alertFieldValue != null && - !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.STRING)) { - validationResponse.addErrorMessage(alertFieldName, - String.format( - "Alert Field Name: %s must be String type", alertFieldValue)); - } - // validate output schema - alert field name is nullable - if (alertFieldValue != null && - !CDAPPluginUtils.validateSchemaFieldType(outputSchemaJson, alertFieldValue, Schema.Type.NULL)) { - validationResponse.addErrorMessage(alertFieldName, - String.format( - "Alert Field Name: %s must be marked as nullable type", alertFieldValue)); - } - } - - return validationResponse; - } -} -- cgit 1.2.3-korg