diff options
author | an4828 <nekrassov@att.com> | 2017-09-26 14:35:17 -0400 |
---|---|---|
committer | an4828 <nekrassov@att.com> | 2017-09-26 14:35:24 -0400 |
commit | 06044df56fb07f4b368888581752855595e7b147 (patch) | |
tree | 6400a3a6ede762887861a621b7fdbfadd25190d5 /dcae-analytics-cdap-tca/src/main/java | |
parent | 475cb8c867038acd73ff540173d54bac3947c610 (diff) |
TCA: Support for VES/A&AI enrichment
Change-Id: I75e0f8e034b9334e918304739e4d73dd12c1ff62
Signed-off-by: an4828 <nekrassov@att.com>
Issue-ID: DCAEGEN2-116
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java')
22 files changed, 2908 insertions, 2645 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java index 8c634a7..f0224a9 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java @@ -1,115 +1,116 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca; - -import co.cask.cdap.api.app.AbstractApplication; -import co.cask.cdap.api.data.stream.Stream; -import co.cask.cdap.api.dataset.DatasetProperties; -import co.cask.cdap.api.dataset.lib.ObjectMappedTable; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; -import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; -import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator; -import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker; -import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker; -import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Rajiv Singla . Creation Date: 10/21/2016. - */ -public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> { - - private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class); - - @Override - public void configure() { - - - // ========= Application configuration Setup ============== // - final TCAAppConfig tcaAppConfig = getConfig(); - - LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig); - - // Validate application configuration - ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator()); - - // App Setup - setName(tcaAppConfig.getAppName()); - setDescription(tcaAppConfig.getAppDescription()); - - // ========== Streams Setup ============== // - // Create DMaaP MR Subscriber CDAP output stream - final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName(); - LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName); - final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName, - CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM); - addStream(subscriberOutputStream); - - - // ============ Datasets Setup ======== // - // Create TCA Message Status Table - final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName(); - final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds(); - LOG.info("Creating TCA Message Status Table: {} with TTL: {}", - tcaVESMessageStatusTableName, messageStatusTableTTLSeconds); - final DatasetProperties messageStatusTableProperties = - TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds); - createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties); - - - // Create TCA Alerts Abatement Table - final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName(); - final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds(); - LOG.info("Creating Alerts Abatement Table: {} with TTL: {}", - tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds); - final DatasetProperties alertsAbatementTableProperties = - TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds); - createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties); - - // Create TCA VES Alerts Table - final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName(); - final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds(); - LOG.info("Creating TCA Alerts Table: {} with TTL: {}", - tcaVESAlertsTableName, alertsTableTTLSeconds); - final DatasetProperties alertTableProperties = - TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds); - createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties); - - // =========== Flow Setup ============= // - addFlow(new TCAVESCollectorFlow(tcaAppConfig)); - - // ========== Workers Setup =========== // - LOG.info("Creating TCA DMaaP Subscriber Worker"); - addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); - LOG.info("Creating TCA DMaaP Publisher Worker"); - addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName())); - // TODO: Remove this before going to production - addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); - } - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca;
+
+import co.cask.cdap.api.app.AbstractApplication;
+import co.cask.cdap.api.data.stream.Stream;
+import co.cask.cdap.api.dataset.DatasetProperties;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
+import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator;
+import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker;
+import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker;
+import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Rajiv Singla . Creation Date: 10/21/2016.
+ */
+public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class);
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void configure() {
+
+
+ // ========= Application configuration Setup ============== //
+ final TCAAppConfig tcaAppConfig = getConfig();
+
+ LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig);
+
+ // Validate application configuration
+ ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator());
+
+ // App Setup
+ setName(tcaAppConfig.getAppName());
+ setDescription(tcaAppConfig.getAppDescription());
+
+ // ========== Streams Setup ============== //
+ // Create DMaaP MR Subscriber CDAP output stream
+ final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName();
+ LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName);
+ final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName,
+ CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM);
+ addStream(subscriberOutputStream);
+
+
+ // ============ Datasets Setup ======== //
+ // Create TCA Message Status Table
+ final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName();
+ final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds();
+ LOG.info("Creating TCA Message Status Table: {} with TTL: {}",
+ tcaVESMessageStatusTableName, messageStatusTableTTLSeconds);
+ final DatasetProperties messageStatusTableProperties =
+ TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
+ createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
+
+
+ // Create TCA Alerts Abatement Table
+ final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName();
+ final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds();
+ LOG.info("Creating Alerts Abatement Table: {} with TTL: {}",
+ tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds);
+ final DatasetProperties alertsAbatementTableProperties =
+ TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds);
+ createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties);
+
+ // Create TCA VES Alerts Table
+ final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
+ final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
+ LOG.info("Creating TCA Alerts Table: {} with TTL: {}",
+ tcaVESAlertsTableName, alertsTableTTLSeconds);
+ final DatasetProperties alertTableProperties =
+ TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds);
+ createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties);
+
+ // =========== Flow Setup ============= //
+ addFlow(new TCAVESCollectorFlow(tcaAppConfig));
+
+ // ========== Workers Setup =========== //
+ LOG.info("Creating TCA DMaaP Subscriber Worker");
+ addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
+ LOG.info("Creating TCA DMaaP Publisher Worker");
+ addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName()));
+ // TODO: Remove this before going to production
+ addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java index 4df35e7..1bb31a1 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java @@ -1,76 +1,82 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.flow; - -import co.cask.cdap.api.flow.AbstractFlow; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet; -import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet; -import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet; -import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; - -/** - * TCA Flow for VES (Virtual Event Streaming) Collector Flow - * - * @author Rajiv Singla . Creation Date: 11/3/2016. - */ -public class TCAVESCollectorFlow extends AbstractFlow { - - private final TCAAppConfig tcaAppConfig; - - public TCAVESCollectorFlow(TCAAppConfig tcaAppConfig) { - this.tcaAppConfig = tcaAppConfig; - } - - @Override - protected void configure() { - - setName(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_NAME_FLOW); - setDescription(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_DESCRIPTION_FLOW); - - final TCAVESMessageRouterFlowlet messageRouterFlowlet = new TCAVESMessageRouterFlowlet(); - addFlowlet(messageRouterFlowlet); - - final TCAVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = - new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName()); - addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances()); - - final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet = - new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName()); - addFlowlet(tcavesAlertsAbatementFlowlet); - - final TCAVESAlertsSinkFlowlet alertsSinkFlowlet = - new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName()); - addFlowlet(alertsSinkFlowlet); - - - // connect DMaaP MR VES Subscriber output stream to VES Message Router Flowlet - connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet); - // connect message router to VES threshold calculator - connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet); - // connect VES threshold calculator flowlet to Alerts Abatement Flowlet - connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet); - // connect Alerts Abatement flowlet to Alerts Sink Flowlet - connect(tcavesAlertsAbatementFlowlet, alertsSinkFlowlet); - - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flow;
+
+import co.cask.cdap.api.flow.AbstractFlow;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAAIEnrichmentFlowlet;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
+
+/**
+ * TCA Flow for VES (Virtual Event Streaming) Collector Flow
+ *
+ * @author Rajiv Singla . Creation Date: 11/3/2016.
+ */
+public class TCAVESCollectorFlow extends AbstractFlow {
+
+ private final TCAAppConfig tcaAppConfig;
+
+ public TCAVESCollectorFlow(TCAAppConfig tcaAppConfig) {
+ this.tcaAppConfig = tcaAppConfig;
+ }
+
+ @Override
+ protected void configure() {
+
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_NAME_FLOW);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_DESCRIPTION_FLOW);
+
+ final TCAVESMessageRouterFlowlet messageRouterFlowlet = new TCAVESMessageRouterFlowlet();
+ addFlowlet(messageRouterFlowlet);
+
+ final TCAVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet =
+ new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName());
+ addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances());
+
+ final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet =
+ new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName());
+ addFlowlet(tcavesAlertsAbatementFlowlet);
+
+ final TCAVESAAIEnrichmentFlowlet tcavesaaiEnrichmentFlowlet = new TCAVESAAIEnrichmentFlowlet();
+ addFlowlet(tcavesaaiEnrichmentFlowlet);
+
+ final TCAVESAlertsSinkFlowlet alertsSinkFlowlet =
+ new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName());
+ addFlowlet(alertsSinkFlowlet);
+
+
+ // connect DMaaP MR VES Subscriber output stream to VES Message Router Flowlet
+ connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet);
+ // connect message router to VES threshold calculator
+ connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet);
+ // connect VES threshold calculator flowlet to Alerts Abatement Flowlet
+ connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet);
+ // connect Alerts Abatement flowlet to AAI Enrichment Flowlet
+ connect(tcavesAlertsAbatementFlowlet, tcavesaaiEnrichmentFlowlet);
+ // connect A&AI Enrichment flowlet to Alerts Sink Flowlet
+ connect(tcavesaaiEnrichmentFlowlet, alertsSinkFlowlet);
+
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java new file mode 100644 index 0000000..60e97b5 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java @@ -0,0 +1,128 @@ +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import org.openecomp.dcae.apod.analytics.aai.AAIClientFactory;
+import org.openecomp.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig;
+import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
+import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Flowlet responsible for doing A&AI Enrichment
+ *
+ * @author Rajiv Singla . Creation Date: 9/20/2017.
+ */
+public class TCAVESAAIEnrichmentFlowlet extends AbstractFlowlet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAVESAAIEnrichmentFlowlet.class);
+
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT)
+ protected OutputEmitter<String> aaiEnrichmentOutputEmitter;
+
+ private TCAAppPreferences tcaAppPreferences;
+ private AAIEnrichmentClient aaiEnrichmentClient;
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_DESCRIPTION_FLOWLET);
+ }
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+ tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
+ if (tcaAppPreferences.getEnableAAIEnrichment()) {
+ final AAIHttpClientConfig aaiHttpClientConfig =
+ CDAPTCAUtils.createAAIEnrichmentClientConfig(tcaAppPreferences);
+ aaiEnrichmentClient = AAIClientFactory.create().getEnrichmentClient(aaiHttpClientConfig);
+ }
+ }
+
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
+ public void performAAIEnrichment(final String alertMessageString) throws IOException {
+
+ // if A&AI enrichment is disabled - no A&AI lookups are required
+ if (!tcaAppPreferences.getEnableAAIEnrichment()) {
+
+ LOG.debug("A&AI Enrichment is disabled. Skip A&AI Enrichment for alert: {}", alertMessageString);
+ aaiEnrichmentOutputEmitter.emit(alertMessageString);
+
+ } else {
+
+ // determine closed Loop Event Status
+ final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
+ final ClosedLoopEventStatus closedLoopEventStatus =
+ ClosedLoopEventStatus.valueOf(tcavesResponse.getClosedLoopEventStatus());
+
+ if (closedLoopEventStatus == ClosedLoopEventStatus.ONSET) {
+ LOG.debug("Performing A&AI Enrichment of ONSET Alert: {}", alertMessageString);
+ final ControlLoopSchemaType controlLoopSchemaType =
+ TCAUtils.determineControlLoopSchemaType(tcavesResponse);
+ final String sourceName = TCAUtils.determineSourceName(tcavesResponse);
+ LOG.debug("A&AI Source Name: {}, Control Loop Schema Type: {} for ONSET Alert: {}",
+ sourceName, controlLoopSchemaType, alertMessageString);
+
+ if (controlLoopSchemaType == ControlLoopSchemaType.VM) {
+ final String aaiVMEnrichmentAPIPath = tcaAppPreferences.getAaiVMEnrichmentAPIPath();
+ TCAUtils.doAAIVMEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVMEnrichmentAPIPath,
+ alertMessageString, sourceName);
+ } else {
+ final String aaiVNFEnrichmentAPIPath = tcaAppPreferences.getAaiVNFEnrichmentAPIPath();
+ TCAUtils.doAAIVNFEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVNFEnrichmentAPIPath,
+ alertMessageString, sourceName);
+ }
+
+ final String aaiEnrichedAlert = TCAUtils.writeValueAsString(tcavesResponse);
+ LOG.debug("Emitting Alert after A&AI Enrichment: {}", aaiEnrichedAlert);
+ aaiEnrichmentOutputEmitter.emit(alertMessageString);
+
+ // skip A&AI Enrichment of alerts with closed Loop Event Status - ABATED
+ } else if (closedLoopEventStatus == ClosedLoopEventStatus.ABATED) {
+ LOG.debug("Skipping Enrichment of Abated Alert: {}", alertMessageString);
+ aaiEnrichmentOutputEmitter.emit(alertMessageString);
+
+ } else {
+ // unsupported closed loop event status
+ final String errorMessage = String.format(
+ "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
+ "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+ }
+ }
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java index 543fc9e..1f9e9b4 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java @@ -1,163 +1,169 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; - -import co.cask.cdap.api.annotation.Output; -import co.cask.cdap.api.annotation.ProcessInput; -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.dataset.lib.ObjectMappedTable; -import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; -import co.cask.cdap.api.flow.flowlet.FlowletContext; -import co.cask.cdap.api.flow.flowlet.OutputEmitter; -import org.apache.commons.lang3.StringUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; -import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; - -/** - * Flowlet responsible to sending out abatement alerts - * - * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017. - */ -public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { - - private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class); - - @Property - private final String tcaAlertsAbatementTableName; - - @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) - protected OutputEmitter<String> alertsAbatementOutputEmitter; - - private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable; - - public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) { - this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName; - } - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET); - setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET); - } - - @Override - public void initialize(FlowletContext flowletContext) throws Exception { - super.initialize(flowletContext); - tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName); - } - - @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) - public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws Exception { - - final String cefMessage = thresholdCalculatorOutput.getCefMessage(); - final String alertMessageString = thresholdCalculatorOutput.getAlertMessage(); - final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName(); - - // alerts must have violated metrics per event name present - if (StringUtils.isBlank(violatedMetricsPerEventNameString)) { - final String errorMessage = String.format( - "No violated metricsPerEventName found for VES Message: %s." + - "Ignored alert message: %s", cefMessage, alertMessageString); - throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); - } - - final MetricsPerEventName violatedMetricsPerEventName = - TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class); - final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class); - final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); - final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); - final ControlLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus(); - - switch (closedLoopEventStatus) { - - case ONSET: - - LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage); - TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, - null, tcaAlertsAbatementTable); - LOG.info("Emitting ONSET alert: {}", alertMessageString); - alertsAbatementOutputEmitter.emit(alertMessageString); - break; - - case ABATED: - - LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold); - final TCAAlertsAbatementEntity previousAlertsAbatementEntry = - TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, - tcaAlertsAbatementTable); - - if (previousAlertsAbatementEntry != null) { - - LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry); - - final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS(); - if (abatementSentTS != null) { - LOG.debug("Abatement alert was already sent at timestamp: {}. " + - "Skip resending this abatement alert again", abatementSentTS); - } else { - - final long newAbatementSentTS = new Date().getTime(); - LOG.debug( - "No abatement alert was sent before." + - "Sending abatement alert:{} for the first time at:{}", - alertMessageString, newAbatementSentTS); - - // save new Abatement alert sent timestamp in table - TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, - Long.toString(newAbatementSentTS), tcaAlertsAbatementTable); - LOG.info("Emitting ABATED alert: {}", alertMessageString); - alertsAbatementOutputEmitter.emit(alertMessageString); - - } - - } else { - LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.", - alertMessageString); - } - - break; - - default: - - final String errorMessage = String.format( - "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + - "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); - throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); - - } - - - } - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Flowlet responsible to sending out abatement alerts
+ *
+ * @author Rajiv Singla . Creation Date: 9/11/2017.
+ */
+public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
+
+ @Property
+ private final String tcaAlertsAbatementTableName;
+
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
+ protected OutputEmitter<String> alertsAbatementOutputEmitter;
+
+ private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
+
+ public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
+ this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
+ }
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+ tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+ }
+
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
+ public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {
+
+ final String cefMessage = thresholdCalculatorOutput.getCefMessage();
+ final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
+ final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
+
+ // alerts must have violated metrics per event name present
+ if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
+ final String errorMessage = String.format(
+ "No violated metricsPerEventName found for VES Message: %s." +
+ "Ignored alert message: %s", cefMessage, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+ }
+
+ final MetricsPerEventName violatedMetricsPerEventName =
+ TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
+ final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
+ final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
+ final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
+ final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
+
+ switch (closedLoopEventStatus) {
+
+ case ONSET:
+
+ LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ null, tcaAlertsAbatementTable);
+ LOG.debug("Emitting ONSET alert: {}", alertMessageString);
+ alertsAbatementOutputEmitter.emit(alertMessageString);
+ break;
+
+ case ABATED:
+
+ LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
+ final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
+ TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
+ tcaAlertsAbatementTable);
+
+ if (previousAlertsAbatementEntry != null) {
+
+ LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
+
+ final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
+ if (abatementSentTS != null) {
+ LOG.debug("Abatement alert was already sent at timestamp: {}. " +
+ "Skip resending this abatement alert again", abatementSentTS);
+ } else {
+
+ final long newAbatementSentTS = new Date().getTime();
+ LOG.debug(
+ "No abatement alert was sent before." +
+ "Sending abatement alert:{} for the first time at:{}",
+ alertMessageString, newAbatementSentTS);
+
+ // save new Abatement alert sent timestamp in table
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+
+ // Set request id to be same as previous ONSET event request ID
+ tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
+ final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);
+
+ LOG.info("Emitting ABATED alert: {}", abatedAlertString);
+ alertsAbatementOutputEmitter.emit(abatedAlertString);
+
+ }
+
+ } else {
+ LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
+ alertMessageString);
+ }
+
+ break;
+
+ default:
+
+ final String errorMessage = String.format(
+ "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
+ "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+
+ }
+
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java index 90e8fc2..8f38ec2 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java @@ -1,71 +1,71 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; - -import co.cask.cdap.api.annotation.ProcessInput; -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.dataset.lib.ObjectMappedTable; -import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; -import co.cask.cdap.api.flow.flowlet.FlowletContext; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; - -/** - * Saves TCA VES Alert Messages in a Time series Table - * - * @author Rajiv Singla . Creation Date: 11/15/2016. - */ -public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet { - - @Property - private final String tcaVESAlertsTableName; - - private ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertsTable; - - public TCAVESAlertsSinkFlowlet(String tcaVESAlertsTableName) { - this.tcaVESAlertsTableName = tcaVESAlertsTableName; - } - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET); - setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET); - } - - @Override - public void initialize(FlowletContext flowletContext) throws Exception { - super.initialize(flowletContext); - tcaVESAlertsTable = getContext().getDataset(tcaVESAlertsTableName); - } - - /** - * Saves messages to Alerts table - * - * @param alertMessage alert message - */ - @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) - public void saveAlerts(String alertMessage) { - // Saves alert message in alerts table - TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable); - } - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
+
+/**
+ * Saves TCA VES Alert Messages in a Time series Table
+ *
+ * @author Rajiv Singla . Creation Date: 11/15/2016.
+ */
+public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet {
+
+ @Property
+ private final String tcaVESAlertsTableName;
+
+ private ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertsTable;
+
+ public TCAVESAlertsSinkFlowlet(String tcaVESAlertsTableName) {
+ this.tcaVESAlertsTableName = tcaVESAlertsTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET);
+ }
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+ tcaVESAlertsTable = getContext().getDataset(tcaVESAlertsTableName);
+ }
+
+ /**
+ * Saves messages to Alerts table
+ *
+ * @param alertMessage alert message
+ */
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT)
+ public void saveAlerts(String alertMessage) {
+ // Saves alert message in alerts table
+ TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable);
+ }
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java index b32173b..06b4f18 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java @@ -1,59 +1,59 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; - -import co.cask.cdap.api.annotation.Output; -import co.cask.cdap.api.annotation.ProcessInput; -import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; -import co.cask.cdap.api.flow.flowlet.OutputEmitter; -import co.cask.cdap.api.flow.flowlet.StreamEvent; -import com.google.common.base.Charsets; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; - - -/** - * TCA Message Router Flowlet emits VES Message to {@link TCAVESThresholdViolationCalculatorFlowlet} instances - * - * @author Rajiv Singla . Creation Date: 11/14/2016. - */ -public class TCAVESMessageRouterFlowlet extends AbstractFlowlet { - - /** - * Emits ves message to TCA Calculator Instances - */ - @Output(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) - protected OutputEmitter<String> vesMessageEmitter; - - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET); - setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET); - } - - @ProcessInput - public void routeVESMessage(StreamEvent vesMessageStreamEvent) { - final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString(); - vesMessageEmitter.emit( - vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import co.cask.cdap.api.flow.flowlet.StreamEvent;
+import com.google.common.base.Charsets;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+
+
+/**
+ * TCA Message Router Flowlet emits VES Message to {@link TCAVESThresholdViolationCalculatorFlowlet} instances
+ *
+ * @author Rajiv Singla . Creation Date: 11/14/2016.
+ */
+public class TCAVESMessageRouterFlowlet extends AbstractFlowlet {
+
+ /**
+ * Emits ves message to TCA Calculator Instances
+ */
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
+ protected OutputEmitter<String> vesMessageEmitter;
+
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET);
+ }
+
+ @ProcessInput
+ public void routeVESMessage(StreamEvent vesMessageStreamEvent) {
+ final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString();
+ vesMessageEmitter.emit(
+ vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode());
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java index a3a35cf..f895d70 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -1,156 +1,156 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; - -import co.cask.cdap.api.annotation.HashPartition; -import co.cask.cdap.api.annotation.Output; -import co.cask.cdap.api.annotation.ProcessInput; -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.dataset.lib.ObjectMappedTable; -import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; -import co.cask.cdap.api.flow.flowlet.FlowletContext; -import co.cask.cdap.api.flow.flowlet.OutputEmitter; -import co.cask.cdap.api.metrics.Metrics; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister.persist; - -/** - * TCA VES Message Filter filters out messages which are not applicable for TCA as per TCA Policy - * - * @author Rajiv Singla . Creation Date: 11/3/2016. - */ -public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { - - private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); - - @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) - protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter; - protected Metrics metrics; - private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; - - @Property - private final String messageStatusTableName; - private Boolean enableAlertCEFFormat; - - private TCAPolicy tcaPolicy; - - /** - * Creates an instance of TCA VES Threshold violation calculator flowlet with give message status table name - * - * @param messageStatusTableName message status table name - */ - public TCAVESThresholdViolationCalculatorFlowlet(String messageStatusTableName) { - this.messageStatusTableName = messageStatusTableName; - } - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET); - setDescription(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET); - } - - - @Override - public void initialize(FlowletContext flowletContext) throws Exception { - super.initialize(flowletContext); - - // parse Runtime Arguments to tca policy preferences - tcaPolicy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext); - // Parse runtime arguments - final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); - enableAlertCEFFormat = tcaAppPreferences.getEnableAlertCEFFormat(); - vesMessageStatusTable = getContext().getDataset(messageStatusTableName); - - } - - /** - * Filters VES Messages that violates TCA Policy - * - * @param vesMessage VES Message - * @throws JsonProcessingException if alert message cannot be parsed into JSON object - */ - @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) - @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY) - public void filterVESMessages(String vesMessage) throws JsonProcessingException { - - TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; - String alertMessage = null; - - // Step 1: Filter incoming messages - final TCACEFProcessorContext processorContext = TCAUtils.filterCEFMessage(vesMessage, tcaPolicy); - - if (processorContext.canProcessingContinue()) { - - // Step 2: Check if CEF Message violate any thresholds - final TCACEFProcessorContext processorContextWithViolations = - TCAUtils.computeThresholdViolations(processorContext); - - if (processorContextWithViolations.canProcessingContinue()) { - - // Step 3: Create Alert Message - final String tcaAppName = getContext().getApplicationSpecification().getName(); - alertMessage = - TCAUtils.createTCAAlertString(processorContextWithViolations, tcaAppName, enableAlertCEFFormat); - calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; - LOG.debug("VES Threshold Violation Detected. An alert message is be generated. {}", alertMessage); - - metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); - - // Step 4: Emit message to Alert Sink Flowlet - final ThresholdCalculatorOutput thresholdCalculatorOutput = - new ThresholdCalculatorOutput(processorContext.getMessage(), - TCAUtils.writeValueAsString(processorContext.getTCAPolicy()), - TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()), - alertMessage); - tcaAlertOutputEmitter.emit(thresholdCalculatorOutput); - - } else { - - calculatorMessageType = TCACalculatorMessageType.COMPLIANT; - metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); - } - - } else { - - metrics.count(CDAPMetricsConstants.TCA_VES_INAPPLICABLE_MESSAGES_METRIC, 1); - } - - // save message to message status table - final int instanceId = getContext().getInstanceId(); - persist(processorContext, instanceId, calculatorMessageType, vesMessageStatusTable, alertMessage); - } - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.HashPartition;
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import co.cask.cdap.api.metrics.Metrics;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister.persist;
+
+/**
+ * TCA VES Message Filter filters out messages which are not applicable for TCA as per TCA Policy
+ *
+ * @author Rajiv Singla . Creation Date: 11/3/2016.
+ */
+public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class);
+
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
+ protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter;
+ protected Metrics metrics;
+ private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable;
+
+ @Property
+ private final String messageStatusTableName;
+ private Boolean enableAlertCEFFormat;
+
+ private TCAPolicy tcaPolicy;
+
+ /**
+ * Creates an instance of TCA VES Threshold violation calculator flowlet with give message status table name
+ *
+ * @param messageStatusTableName message status table name
+ */
+ public TCAVESThresholdViolationCalculatorFlowlet(String messageStatusTableName) {
+ this.messageStatusTableName = messageStatusTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET);
+ }
+
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+
+ // parse Runtime Arguments to tca policy preferences
+ tcaPolicy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext);
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
+ enableAlertCEFFormat = tcaAppPreferences.getEnableAlertCEFFormat();
+ vesMessageStatusTable = getContext().getDataset(messageStatusTableName);
+
+ }
+
+ /**
+ * Filters VES Messages that violates TCA Policy
+ *
+ * @param vesMessage VES Message
+ * @throws JsonProcessingException if alert message cannot be parsed into JSON object
+ */
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
+ @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY)
+ public void filterVESMessages(String vesMessage) throws JsonProcessingException {
+
+ TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
+ String alertMessage = null;
+
+ // Step 1: Filter incoming messages
+ final TCACEFProcessorContext processorContext = TCAUtils.filterCEFMessage(vesMessage, tcaPolicy);
+
+ if (processorContext.canProcessingContinue()) {
+
+ // Step 2: Check if CEF Message violate any thresholds
+ final TCACEFProcessorContext processorContextWithViolations =
+ TCAUtils.computeThresholdViolations(processorContext);
+
+ if (processorContextWithViolations.canProcessingContinue()) {
+
+ // Step 3: Create Alert Message
+ final String tcaAppName = getContext().getApplicationSpecification().getName();
+ alertMessage =
+ TCAUtils.createTCAAlertString(processorContextWithViolations, tcaAppName, enableAlertCEFFormat);
+ calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT;
+ LOG.debug("VES Threshold Violation Detected. An alert message is be generated. {}", alertMessage);
+
+ metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
+
+ // Step 4: Emit message to Alert Sink Flowlet
+ final ThresholdCalculatorOutput thresholdCalculatorOutput =
+ new ThresholdCalculatorOutput(processorContext.getMessage(),
+ TCAUtils.writeValueAsString(processorContext.getTCAPolicy()),
+ TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()),
+ alertMessage);
+ tcaAlertOutputEmitter.emit(thresholdCalculatorOutput);
+
+ } else {
+
+ calculatorMessageType = TCACalculatorMessageType.COMPLIANT;
+ metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1);
+ }
+
+ } else {
+
+ metrics.count(CDAPMetricsConstants.TCA_VES_INAPPLICABLE_MESSAGES_METRIC, 1);
+ }
+
+ // save message to message status table
+ final int instanceId = getContext().getInstanceId();
+ persist(processorContext, instanceId, calculatorMessageType, vesMessageStatusTable, alertMessage);
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java index 5d74f01..9d0b409 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java @@ -1,110 +1,110 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.settings; - -import com.google.common.base.Objects; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBaseAppConfig; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; - - -/** - * Contains CDAP App Config Settings for TCA Application - * - * @author Rajiv Singla . Creation Date: 11/2/2016. - */ -public class TCAAppConfig extends CDAPBaseAppConfig { - - - private static final long serialVersionUID = 1L; - - protected String tcaSubscriberOutputStreamName; - protected Integer thresholdCalculatorFlowletInstances; - - protected String tcaVESMessageStatusTableName; - protected Integer tcaVESMessageStatusTableTTLSeconds; - protected String tcaVESAlertsTableName; - protected Integer tcaVESAlertsTableTTLSeconds; - protected String tcaAlertsAbatementTableName; - protected Integer tcaAlertsAbatementTableTTLSeconds; - - - public TCAAppConfig() { - appName = CDAPComponentsConstants.TCA_DEFAULT_NAME_APP; - appDescription = CDAPComponentsConstants.TCA_DEFAULT_DESCRIPTION_APP; - tcaSubscriberOutputStreamName = CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM; - thresholdCalculatorFlowletInstances = AnalyticsConstants.TCA_DEFAULT_THRESHOLD_CALCULATOR_FLOWLET_INSTANCES; - tcaVESMessageStatusTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_NAME_TABLE; - tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE; - tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE; - tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE; - tcaAlertsAbatementTableName = CDAPComponentsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE; - tcaAlertsAbatementTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_TTL_TABLE; - } - - public String getTcaSubscriberOutputStreamName() { - return tcaSubscriberOutputStreamName; - } - - public String getTcaVESMessageStatusTableName() { - return tcaVESMessageStatusTableName; - } - - public Integer getTcaVESMessageStatusTableTTLSeconds() { - return tcaVESMessageStatusTableTTLSeconds; - } - - public String getTcaVESAlertsTableName() { - return tcaVESAlertsTableName; - } - - public Integer getTcaVESAlertsTableTTLSeconds() { - return tcaVESAlertsTableTTLSeconds; - } - - public Integer getThresholdCalculatorFlowletInstances() { - return thresholdCalculatorFlowletInstances; - } - - public String getTcaAlertsAbatementTableName() { - return tcaAlertsAbatementTableName; - } - - public Integer getTcaAlertsAbatementTableTTLSeconds() { - return tcaAlertsAbatementTableTTLSeconds; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("appName", appName) - .add("appDescription", appDescription) - .add("tcaSubscriberOutputStreamName", tcaSubscriberOutputStreamName) - .add("thresholdCalculatorFlowletInstances", thresholdCalculatorFlowletInstances) - .add("tcaVESMessageStatusTableName", tcaVESMessageStatusTableName) - .add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds) - .add("tcaVESAlertsTableName", tcaVESAlertsTableName) - .add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds) - .add("tcaAlertsAbatementTableName", tcaAlertsAbatementTableName) - .add("tcaAlertsAbatementTableTTLSeconds", tcaAlertsAbatementTableTTLSeconds) - .toString(); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.settings;
+
+import com.google.common.base.Objects;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBaseAppConfig;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+
+
+/**
+ * Contains CDAP App Config Settings for TCA Application
+ *
+ * @author Rajiv Singla . Creation Date: 11/2/2016.
+ */
+public class TCAAppConfig extends CDAPBaseAppConfig {
+
+
+ private static final long serialVersionUID = 1L;
+
+ protected String tcaSubscriberOutputStreamName;
+ protected Integer thresholdCalculatorFlowletInstances;
+
+ protected String tcaVESMessageStatusTableName;
+ protected Integer tcaVESMessageStatusTableTTLSeconds;
+ protected String tcaVESAlertsTableName;
+ protected Integer tcaVESAlertsTableTTLSeconds;
+ protected String tcaAlertsAbatementTableName;
+ protected Integer tcaAlertsAbatementTableTTLSeconds;
+
+
+ public TCAAppConfig() {
+ appName = CDAPComponentsConstants.TCA_DEFAULT_NAME_APP;
+ appDescription = CDAPComponentsConstants.TCA_DEFAULT_DESCRIPTION_APP;
+ tcaSubscriberOutputStreamName = CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM;
+ thresholdCalculatorFlowletInstances = AnalyticsConstants.TCA_DEFAULT_THRESHOLD_CALCULATOR_FLOWLET_INSTANCES;
+ tcaVESMessageStatusTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_NAME_TABLE;
+ tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE;
+ tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE;
+ tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE;
+ tcaAlertsAbatementTableName = CDAPComponentsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE;
+ tcaAlertsAbatementTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_TTL_TABLE;
+ }
+
+ public String getTcaSubscriberOutputStreamName() {
+ return tcaSubscriberOutputStreamName;
+ }
+
+ public String getTcaVESMessageStatusTableName() {
+ return tcaVESMessageStatusTableName;
+ }
+
+ public Integer getTcaVESMessageStatusTableTTLSeconds() {
+ return tcaVESMessageStatusTableTTLSeconds;
+ }
+
+ public String getTcaVESAlertsTableName() {
+ return tcaVESAlertsTableName;
+ }
+
+ public Integer getTcaVESAlertsTableTTLSeconds() {
+ return tcaVESAlertsTableTTLSeconds;
+ }
+
+ public Integer getThresholdCalculatorFlowletInstances() {
+ return thresholdCalculatorFlowletInstances;
+ }
+
+ public String getTcaAlertsAbatementTableName() {
+ return tcaAlertsAbatementTableName;
+ }
+
+ public Integer getTcaAlertsAbatementTableTTLSeconds() {
+ return tcaAlertsAbatementTableTTLSeconds;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appName", appName)
+ .add("appDescription", appDescription)
+ .add("tcaSubscriberOutputStreamName", tcaSubscriberOutputStreamName)
+ .add("thresholdCalculatorFlowletInstances", thresholdCalculatorFlowletInstances)
+ .add("tcaVESMessageStatusTableName", tcaVESMessageStatusTableName)
+ .add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds)
+ .add("tcaVESAlertsTableName", tcaVESAlertsTableName)
+ .add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds)
+ .add("tcaAlertsAbatementTableName", tcaAlertsAbatementTableName)
+ .add("tcaAlertsAbatementTableTTLSeconds", tcaAlertsAbatementTableTTLSeconds)
+ .toString();
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java index 2f86b37..2dadcf2 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java @@ -1,272 +1,349 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.settings; - -import com.google.common.base.Objects; -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; - -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS; - -/** - * <p> - * App Preferences for Analytics TCA (Threshold Crossing Alert) App - * <p> - * @author Rajiv Singla . Creation Date: 10/4/2016. - */ -public class TCAAppPreferences implements CDAPAppPreferences { - - private static final long serialVersionUID = 1L; - - // subscriber preferences - protected String subscriberHostName; - - protected Integer subscriberHostPort; - - protected String subscriberTopicName; - - protected String subscriberProtocol; - - protected String subscriberUserName; - - protected String subscriberUserPassword; - - protected String subscriberContentType; - - protected String subscriberConsumerId; - - protected String subscriberConsumerGroup; - - protected Integer subscriberTimeoutMS; - - protected Integer subscriberMessageLimit; - - protected Integer subscriberPollingInterval; - - // publisher preferences - protected String publisherHostName; - - protected Integer publisherHostPort; - - protected String publisherTopicName; - - protected String publisherProtocol; - - protected String publisherUserName; - - protected String publisherUserPassword; - - protected String publisherContentType; - - protected Integer publisherMaxBatchSize; - - protected Integer publisherMaxRecoveryQueueSize; - - protected Integer publisherPollingInterval; - - protected Boolean enableAlertCEFFormat; - - /** - * Default constructor to setup default values for TCA App Preferences - */ - public TCAAppPreferences() { - - // subscriber defaults - subscriberPollingInterval = TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS; - - // publisher defaults - publisherMaxBatchSize = TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE; - publisherMaxRecoveryQueueSize = TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; - publisherPollingInterval = TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS; - - enableAlertCEFFormat = false; - - } - - public String getSubscriberHostName() { - return subscriberHostName; - } - - public Integer getSubscriberHostPort() { - return subscriberHostPort; - } - - public String getSubscriberTopicName() { - return subscriberTopicName; - } - - public String getSubscriberProtocol() { - return subscriberProtocol; - } - - public String getSubscriberUserName() { - return subscriberUserName; - } - - public String getSubscriberUserPassword() { - return subscriberUserPassword; - } - - public String getSubscriberContentType() { - return subscriberContentType; - } - - public String getSubscriberConsumerId() { - return subscriberConsumerId; - } - - public String getSubscriberConsumerGroup() { - return subscriberConsumerGroup; - } - - public Integer getSubscriberTimeoutMS() { - return subscriberTimeoutMS; - } - - public Integer getSubscriberMessageLimit() { - return subscriberMessageLimit; - } - - public Integer getSubscriberPollingInterval() { - return subscriberPollingInterval; - } - - public String getPublisherHostName() { - return publisherHostName; - } - - public Integer getPublisherHostPort() { - return publisherHostPort; - } - - public String getPublisherTopicName() { - return publisherTopicName; - } - - public String getPublisherProtocol() { - return publisherProtocol; - } - - public String getPublisherUserName() { - return publisherUserName; - } - - public String getPublisherUserPassword() { - return publisherUserPassword; - } - - public String getPublisherContentType() { - return publisherContentType; - } - - public Integer getPublisherMaxBatchSize() { - return publisherMaxBatchSize; - } - - public Integer getPublisherMaxRecoveryQueueSize() { - return publisherMaxRecoveryQueueSize; - } - - public Integer getPublisherPollingInterval() { - return publisherPollingInterval; - } - - public Boolean getEnableAlertCEFFormat() { - return enableAlertCEFFormat; - } - - - public void setSubscriberHostName(String subscriberHostName) { - this.subscriberHostName = subscriberHostName; - } - - public void setSubscriberHostPort(Integer subscriberHostPort) { - this.subscriberHostPort = subscriberHostPort; - } - - public void setSubscriberTopicName(String subscriberTopicName) { - this.subscriberTopicName = subscriberTopicName; - } - - public void setSubscriberProtocol(String subscriberProtocol) { - this.subscriberProtocol = subscriberProtocol; - } - - public void setSubscriberUserName(String subscriberUserName) { - this.subscriberUserName = subscriberUserName; - } - - public void setSubscriberUserPassword(String subscriberUserPassword) { - this.subscriberUserPassword = subscriberUserPassword; - } - - public void setPublisherHostName(String publisherHostName) { - this.publisherHostName = publisherHostName; - } - - public void setPublisherHostPort(Integer publisherHostPort) { - this.publisherHostPort = publisherHostPort; - } - - public void setPublisherTopicName(String publisherTopicName) { - this.publisherTopicName = publisherTopicName; - } - - public void setPublisherProtocol(String publisherProtocol) { - this.publisherProtocol = publisherProtocol; - } - - public void setPublisherUserName(String publisherUserName) { - this.publisherUserName = publisherUserName; - } - - public void setPublisherUserPassword(String publisherUserPassword) { - this.publisherUserPassword = publisherUserPassword; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("subscriberHostName", subscriberHostName) - .add("subscriberHostPort", subscriberHostPort) - .add("subscriberTopicName", subscriberTopicName) - .add("subscriberProtocol", subscriberProtocol) - .add("subscriberUserName", subscriberUserName) - .add("subscriberContentType", subscriberContentType) - .add("subscriberConsumerId", subscriberConsumerId) - .add("subscriberConsumerGroup", subscriberConsumerGroup) - .add("subscriberTimeoutMS", subscriberTimeoutMS) - .add("subscriberMessageLimit", subscriberMessageLimit) - .add("subscriberPollingInterval", subscriberPollingInterval) - .add("publisherHostName", publisherHostName) - .add("publisherHostPort", publisherHostPort) - .add("publisherTopicName", publisherTopicName) - .add("publisherProtocol", publisherProtocol) - .add("publisherUserName", publisherUserName) - .add("publisherContentType", publisherContentType) - .add("publisherMaxBatchSize", publisherMaxBatchSize) - .add("publisherMaxRecoveryQueueSize", publisherMaxRecoveryQueueSize) - .add("publisherPollingInterval", publisherPollingInterval) - .toString(); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.settings;
+
+import com.google.common.base.Objects;
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+
+/**
+ * <p>
+ * App Preferences for Analytics TCA (Threshold Crossing Alert) App
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/4/2016.
+ */
+public class TCAAppPreferences implements CDAPAppPreferences {
+
+ private static final long serialVersionUID = 1L;
+
+ // subscriber preferences
+ protected String subscriberHostName;
+
+ protected Integer subscriberHostPort;
+
+ protected String subscriberTopicName;
+
+ protected String subscriberProtocol;
+
+ protected String subscriberUserName;
+
+ protected String subscriberUserPassword;
+
+ protected String subscriberContentType;
+
+ protected String subscriberConsumerId;
+
+ protected String subscriberConsumerGroup;
+
+ protected Integer subscriberTimeoutMS;
+
+ protected Integer subscriberMessageLimit;
+
+ protected Integer subscriberPollingInterval;
+
+ // publisher preferences
+ protected String publisherHostName;
+
+ protected Integer publisherHostPort;
+
+ protected String publisherTopicName;
+
+ protected String publisherProtocol;
+
+ protected String publisherUserName;
+
+ protected String publisherUserPassword;
+
+ protected String publisherContentType;
+
+ protected Integer publisherMaxBatchSize;
+
+ protected Integer publisherMaxRecoveryQueueSize;
+
+ protected Integer publisherPollingInterval;
+
+ protected Boolean enableAlertCEFFormat;
+
+
+ // A&AI Enrichment
+
+ protected Boolean enableAAIEnrichment;
+
+ protected String aaiEnrichmentHost;
+
+ protected Integer aaiEnrichmentPortNumber;
+
+ protected String aaiEnrichmentProtocol;
+
+ protected String aaiEnrichmentUserName;
+
+ protected String aaiEnrichmentUserPassword;
+
+ protected Boolean aaiEnrichmentIgnoreSSLCertificateErrors;
+
+ protected String aaiVNFEnrichmentAPIPath;
+
+ protected String aaiVMEnrichmentAPIPath;
+
+
+ // A&AI Enrichment Proxy
+
+ protected String aaiEnrichmentProxyURL;
+
+ /**
+ * Default constructor to setup default values for TCA App Preferences
+ */
+ public TCAAppPreferences() {
+
+ // subscriber defaults
+ subscriberPollingInterval = AnalyticsConstants.TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS;
+
+ // publisher defaults
+ publisherMaxBatchSize = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE;
+ publisherMaxRecoveryQueueSize = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE;
+ publisherPollingInterval = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS;
+
+ enableAlertCEFFormat = AnalyticsConstants.TCA_DEFAULT_ENABLE_CEF_FORMATTED_ALERT;
+
+ enableAAIEnrichment = AnalyticsConstants.TCA_DEFAULT_ENABLE_AAI_ENRICHMENT;
+ aaiEnrichmentIgnoreSSLCertificateErrors =
+ AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_IGNORE_SSL_CERTIFICATE_ERRORS;
+ aaiEnrichmentProxyURL = AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_PROXY_URL;
+
+ }
+
+ public String getSubscriberHostName() {
+ return subscriberHostName;
+ }
+
+ public Integer getSubscriberHostPort() {
+ return subscriberHostPort;
+ }
+
+ public String getSubscriberTopicName() {
+ return subscriberTopicName;
+ }
+
+ public String getSubscriberProtocol() {
+ return subscriberProtocol;
+ }
+
+ public String getSubscriberUserName() {
+ return subscriberUserName;
+ }
+
+ public String getSubscriberUserPassword() {
+ return subscriberUserPassword;
+ }
+
+ public String getSubscriberContentType() {
+ return subscriberContentType;
+ }
+
+ public String getSubscriberConsumerId() {
+ return subscriberConsumerId;
+ }
+
+ public String getSubscriberConsumerGroup() {
+ return subscriberConsumerGroup;
+ }
+
+ public Integer getSubscriberTimeoutMS() {
+ return subscriberTimeoutMS;
+ }
+
+ public Integer getSubscriberMessageLimit() {
+ return subscriberMessageLimit;
+ }
+
+ public Integer getSubscriberPollingInterval() {
+ return subscriberPollingInterval;
+ }
+
+ public String getPublisherHostName() {
+ return publisherHostName;
+ }
+
+ public Integer getPublisherHostPort() {
+ return publisherHostPort;
+ }
+
+ public String getPublisherTopicName() {
+ return publisherTopicName;
+ }
+
+ public String getPublisherProtocol() {
+ return publisherProtocol;
+ }
+
+ public String getPublisherUserName() {
+ return publisherUserName;
+ }
+
+ public String getPublisherUserPassword() {
+ return publisherUserPassword;
+ }
+
+ public String getPublisherContentType() {
+ return publisherContentType;
+ }
+
+ public Integer getPublisherMaxBatchSize() {
+ return publisherMaxBatchSize;
+ }
+
+ public Integer getPublisherMaxRecoveryQueueSize() {
+ return publisherMaxRecoveryQueueSize;
+ }
+
+ public Integer getPublisherPollingInterval() {
+ return publisherPollingInterval;
+ }
+
+ public Boolean getEnableAlertCEFFormat() {
+ return enableAlertCEFFormat;
+ }
+
+
+ public void setSubscriberHostName(String subscriberHostName) {
+ this.subscriberHostName = subscriberHostName;
+ }
+
+ public void setSubscriberHostPort(Integer subscriberHostPort) {
+ this.subscriberHostPort = subscriberHostPort;
+ }
+
+ public void setSubscriberTopicName(String subscriberTopicName) {
+ this.subscriberTopicName = subscriberTopicName;
+ }
+
+ public void setSubscriberProtocol(String subscriberProtocol) {
+ this.subscriberProtocol = subscriberProtocol;
+ }
+
+ public void setSubscriberUserName(String subscriberUserName) {
+ this.subscriberUserName = subscriberUserName;
+ }
+
+ public void setSubscriberUserPassword(String subscriberUserPassword) {
+ this.subscriberUserPassword = subscriberUserPassword;
+ }
+
+ public void setPublisherHostName(String publisherHostName) {
+ this.publisherHostName = publisherHostName;
+ }
+
+ public void setPublisherHostPort(Integer publisherHostPort) {
+ this.publisherHostPort = publisherHostPort;
+ }
+
+ public void setPublisherTopicName(String publisherTopicName) {
+ this.publisherTopicName = publisherTopicName;
+ }
+
+ public void setPublisherProtocol(String publisherProtocol) {
+ this.publisherProtocol = publisherProtocol;
+ }
+
+ public void setPublisherUserName(String publisherUserName) {
+ this.publisherUserName = publisherUserName;
+ }
+
+ public void setPublisherUserPassword(String publisherUserPassword) {
+ this.publisherUserPassword = publisherUserPassword;
+ }
+
+ public Boolean getEnableAAIEnrichment() {
+ return enableAAIEnrichment;
+ }
+
+ public String getAaiEnrichmentHost() {
+ return aaiEnrichmentHost;
+ }
+
+ public Integer getAaiEnrichmentPortNumber() {
+ return aaiEnrichmentPortNumber;
+ }
+
+ public String getAaiEnrichmentProtocol() {
+ return aaiEnrichmentProtocol;
+ }
+
+ public String getAaiEnrichmentUserName() {
+ return aaiEnrichmentUserName;
+ }
+
+ public String getAaiEnrichmentUserPassword() {
+ return aaiEnrichmentUserPassword;
+ }
+
+ public Boolean getAaiEnrichmentIgnoreSSLCertificateErrors() {
+ return aaiEnrichmentIgnoreSSLCertificateErrors;
+ }
+
+ public String getAaiVNFEnrichmentAPIPath() {
+ return aaiVNFEnrichmentAPIPath;
+ }
+
+ public String getAaiVMEnrichmentAPIPath() {
+ return aaiVMEnrichmentAPIPath;
+ }
+
+ public String getAaiEnrichmentProxyURL() {
+ return aaiEnrichmentProxyURL;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("subscriberHostName", subscriberHostName)
+ .add("subscriberHostPort", subscriberHostPort)
+ .add("subscriberTopicName", subscriberTopicName)
+ .add("subscriberProtocol", subscriberProtocol)
+ .add("subscriberUserName", subscriberUserName)
+ .add("subscriberContentType", subscriberContentType)
+ .add("subscriberConsumerId", subscriberConsumerId)
+ .add("subscriberConsumerGroup", subscriberConsumerGroup)
+ .add("subscriberTimeoutMS", subscriberTimeoutMS)
+ .add("subscriberMessageLimit", subscriberMessageLimit)
+ .add("subscriberPollingInterval", subscriberPollingInterval)
+ .add("publisherHostName", publisherHostName)
+ .add("publisherHostPort", publisherHostPort)
+ .add("publisherTopicName", publisherTopicName)
+ .add("publisherProtocol", publisherProtocol)
+ .add("publisherUserName", publisherUserName)
+ .add("publisherContentType", publisherContentType)
+ .add("publisherMaxBatchSize", publisherMaxBatchSize)
+ .add("publisherMaxRecoveryQueueSize", publisherMaxRecoveryQueueSize)
+ .add("publisherPollingInterval", publisherPollingInterval)
+ .add("enableAlertCEFFormat", enableAlertCEFFormat)
+ .add("enableAAIEnrichment", enableAAIEnrichment)
+ .add("aaiEnrichmentHost", aaiEnrichmentHost)
+ .add("aaiEnrichmentPortNumber", aaiEnrichmentPortNumber)
+ .add("aaiEnrichmentProtocol", aaiEnrichmentProtocol)
+ .add("aaiEnrichmentUserName", aaiEnrichmentUserName)
+ .add("aaiEnrichmentIgnoreSSLCertificateErrors", aaiEnrichmentIgnoreSSLCertificateErrors)
+ .add("aaiVNFEnrichmentAPIPath", aaiVNFEnrichmentAPIPath)
+ .add("aaiVMEnrichmentAPIPath", aaiVMEnrichmentAPIPath)
+ .add("aaiEnrichmentProxyEnabled", aaiEnrichmentProxyURL == null ? "false" : "true")
+ .toString();
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java index 9e27f22..1a7a7ea 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java @@ -1,36 +1,36 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.settings; - -import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; - -/** - * A wrapper over {@link TCAPolicy} to act as app Preferences as TCA Policy is passed - * by controller as runtime arguments from CDAP app preferences - * <p> - * @author Rajiv Singla . Creation Date: 11/29/2016. - */ -public class TCAPolicyPreferences extends TCAPolicy implements CDAPAppPreferences { - - private static final long serialVersionUID = 1L; - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.settings;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+
+/**
+ * A wrapper over {@link TCAPolicy} to act as app Preferences as TCA Policy is passed
+ * by controller as runtime arguments from CDAP app preferences
+ * <p>
+ * @author Rajiv Singla . Creation Date: 11/29/2016.
+ */
+public class TCAPolicyPreferences extends TCAPolicy implements CDAPAppPreferences {
+
+ private static final long serialVersionUID = 1L;
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java index 808b8ca..9993a2e 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java @@ -1,97 +1,97 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.utils; - -import com.google.common.base.Function; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; - -import javax.annotation.Nonnull; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; - - -/** - * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} - * <p> - * @author Rajiv Singla . Creation Date: 11/17/2016. - */ -public class AppPreferencesToPublisherConfigMapper implements Function<TCAAppPreferences, DMaaPMRPublisherConfig> { - - /** - * Factory method to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object - * - * @param tcaAppPreferences tca App Preferences - * - * @return publisher config object - */ - public static DMaaPMRPublisherConfig map(final TCAAppPreferences tcaAppPreferences) { - return new AppPreferencesToPublisherConfigMapper().apply(tcaAppPreferences); - } - - /** - * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object - * - * @param tcaAppPreferences tca App Preferences - * - * @return publisher config object - */ - @Nonnull - @Override - public DMaaPMRPublisherConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { - - // Create a new publisher settings builder - final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = new DMaaPMRPublisherConfig.Builder( - tcaAppPreferences.getPublisherHostName(), tcaAppPreferences.getPublisherTopicName()); - - // Setup up any optional publisher parameters if they are present - final Integer publisherHostPort = tcaAppPreferences.getPublisherHostPort(); - if (publisherHostPort != null) { - publisherConfigBuilder.setPortNumber(publisherHostPort); - } - final String publisherProtocol = tcaAppPreferences.getPublisherProtocol(); - if (isPresent(publisherProtocol)) { - publisherConfigBuilder.setProtocol(publisherProtocol); - } - final String publisherUserName = tcaAppPreferences.getPublisherUserName(); - if (isPresent(publisherUserName)) { - publisherConfigBuilder.setUserName(publisherUserName); - } - final String publisherUserPassword = tcaAppPreferences.getPublisherUserPassword(); - if (isPresent(publisherUserPassword)) { - publisherConfigBuilder.setUserPassword(publisherUserPassword); - } - final String publisherContentType = tcaAppPreferences.getPublisherContentType(); - if (isPresent(publisherContentType)) { - publisherConfigBuilder.setContentType(publisherContentType); - } - final Integer publisherMaxBatchSize = tcaAppPreferences.getPublisherMaxBatchSize(); - if (publisherMaxBatchSize != null) { - publisherConfigBuilder.setMaxBatchSize(publisherMaxBatchSize); - } - final Integer publisherMaxRecoveryQueueSize = tcaAppPreferences.getPublisherMaxRecoveryQueueSize(); - if (publisherMaxRecoveryQueueSize != null) { - publisherConfigBuilder.setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize); - } - - return publisherConfigBuilder.build(); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.utils;
+
+import com.google.common.base.Function;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+
+/**
+ * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 11/17/2016.
+ */
+public class AppPreferencesToPublisherConfigMapper implements Function<TCAAppPreferences, DMaaPMRPublisherConfig> {
+
+ /**
+ * Factory method to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object
+ *
+ * @param tcaAppPreferences tca App Preferences
+ *
+ * @return publisher config object
+ */
+ public static DMaaPMRPublisherConfig map(final TCAAppPreferences tcaAppPreferences) {
+ return new AppPreferencesToPublisherConfigMapper().apply(tcaAppPreferences);
+ }
+
+ /**
+ * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object
+ *
+ * @param tcaAppPreferences tca App Preferences
+ *
+ * @return publisher config object
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRPublisherConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) {
+
+ // Create a new publisher settings builder
+ final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = new DMaaPMRPublisherConfig.Builder(
+ tcaAppPreferences.getPublisherHostName(), tcaAppPreferences.getPublisherTopicName());
+
+ // Setup up any optional publisher parameters if they are present
+ final Integer publisherHostPort = tcaAppPreferences.getPublisherHostPort();
+ if (publisherHostPort != null) {
+ publisherConfigBuilder.setPortNumber(publisherHostPort);
+ }
+ final String publisherProtocol = tcaAppPreferences.getPublisherProtocol();
+ if (isPresent(publisherProtocol)) {
+ publisherConfigBuilder.setProtocol(publisherProtocol);
+ }
+ final String publisherUserName = tcaAppPreferences.getPublisherUserName();
+ if (isPresent(publisherUserName)) {
+ publisherConfigBuilder.setUserName(publisherUserName);
+ }
+ final String publisherUserPassword = tcaAppPreferences.getPublisherUserPassword();
+ if (isPresent(publisherUserPassword)) {
+ publisherConfigBuilder.setUserPassword(publisherUserPassword);
+ }
+ final String publisherContentType = tcaAppPreferences.getPublisherContentType();
+ if (isPresent(publisherContentType)) {
+ publisherConfigBuilder.setContentType(publisherContentType);
+ }
+ final Integer publisherMaxBatchSize = tcaAppPreferences.getPublisherMaxBatchSize();
+ if (publisherMaxBatchSize != null) {
+ publisherConfigBuilder.setMaxBatchSize(publisherMaxBatchSize);
+ }
+ final Integer publisherMaxRecoveryQueueSize = tcaAppPreferences.getPublisherMaxRecoveryQueueSize();
+ if (publisherMaxRecoveryQueueSize != null) {
+ publisherConfigBuilder.setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize);
+ }
+
+ return publisherConfigBuilder.build();
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java index e017b81..d7447f0 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java @@ -1,113 +1,113 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.utils; - -import com.google.common.base.Function; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; - -import javax.annotation.Nonnull; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; - - -/** - * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} - * - * @author Rajiv Singla . Creation Date: 11/17/2016. - */ -public class AppPreferencesToSubscriberConfigMapper implements Function<TCAAppPreferences, DMaaPMRSubscriberConfig> { - - /** - * Factory Method to converts {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object - * - * @param tcaAppPreferences tca app preferences - * @return DMaaP Subscriber Config - */ - public static DMaaPMRSubscriberConfig map(final TCAAppPreferences tcaAppPreferences) { - return new AppPreferencesToSubscriberConfigMapper().apply(tcaAppPreferences); - } - - /** - * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object - * - * @param tcaAppPreferences tca app preferences - * - * @return DMaaP Subscriber Config - */ - @Nonnull - @Override - public DMaaPMRSubscriberConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { - - // Create a new subscriber settings builder - final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder( - tcaAppPreferences.getSubscriberHostName(), tcaAppPreferences.getSubscriberTopicName()); - - // Setup up any optional subscriber parameters if they are present - final Integer subscriberHostPortNumber = tcaAppPreferences.getSubscriberHostPort(); - if (subscriberHostPortNumber != null) { - subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber); - } - - final String subscriberProtocol = tcaAppPreferences.getSubscriberProtocol(); - if (isPresent(subscriberProtocol)) { - subscriberConfigBuilder.setProtocol(subscriberProtocol); - } - - final String subscriberUserName = tcaAppPreferences.getSubscriberUserName(); - if (isPresent(subscriberUserName)) { - subscriberConfigBuilder.setUserName(subscriberUserName); - } - - final String subscriberUserPassword = tcaAppPreferences.getSubscriberUserPassword(); - if (isPresent(subscriberUserPassword)) { - subscriberConfigBuilder.setUserPassword(subscriberUserPassword); - } - - final String subscriberContentType = tcaAppPreferences.getSubscriberContentType(); - if (isPresent(subscriberContentType)) { - subscriberConfigBuilder.setContentType(subscriberContentType); - } - - final String subscriberConsumerId = tcaAppPreferences.getSubscriberConsumerId(); - if (isPresent(subscriberConsumerId)) { - subscriberConfigBuilder.setConsumerId(subscriberConsumerId); - } - - final String subscriberConsumerGroup = tcaAppPreferences.getSubscriberConsumerGroup(); - if (isPresent(subscriberConsumerGroup)) { - subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup); - } - - final Integer subscriberTimeoutMS = tcaAppPreferences.getSubscriberTimeoutMS(); - if (subscriberTimeoutMS != null) { - subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS); - } - final Integer subscriberMessageLimit = tcaAppPreferences.getSubscriberMessageLimit(); - if (subscriberMessageLimit != null) { - subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit); - } - - // return Subscriber settings - return subscriberConfigBuilder.build(); - - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.utils;
+
+import com.google.common.base.Function;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+
+/**
+ * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig}
+ *
+ * @author Rajiv Singla . Creation Date: 11/17/2016.
+ */
+public class AppPreferencesToSubscriberConfigMapper implements Function<TCAAppPreferences, DMaaPMRSubscriberConfig> {
+
+ /**
+ * Factory Method to converts {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object
+ *
+ * @param tcaAppPreferences tca app preferences
+ * @return DMaaP Subscriber Config
+ */
+ public static DMaaPMRSubscriberConfig map(final TCAAppPreferences tcaAppPreferences) {
+ return new AppPreferencesToSubscriberConfigMapper().apply(tcaAppPreferences);
+ }
+
+ /**
+ * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object
+ *
+ * @param tcaAppPreferences tca app preferences
+ *
+ * @return DMaaP Subscriber Config
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRSubscriberConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) {
+
+ // Create a new subscriber settings builder
+ final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder(
+ tcaAppPreferences.getSubscriberHostName(), tcaAppPreferences.getSubscriberTopicName());
+
+ // Setup up any optional subscriber parameters if they are present
+ final Integer subscriberHostPortNumber = tcaAppPreferences.getSubscriberHostPort();
+ if (subscriberHostPortNumber != null) {
+ subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber);
+ }
+
+ final String subscriberProtocol = tcaAppPreferences.getSubscriberProtocol();
+ if (isPresent(subscriberProtocol)) {
+ subscriberConfigBuilder.setProtocol(subscriberProtocol);
+ }
+
+ final String subscriberUserName = tcaAppPreferences.getSubscriberUserName();
+ if (isPresent(subscriberUserName)) {
+ subscriberConfigBuilder.setUserName(subscriberUserName);
+ }
+
+ final String subscriberUserPassword = tcaAppPreferences.getSubscriberUserPassword();
+ if (isPresent(subscriberUserPassword)) {
+ subscriberConfigBuilder.setUserPassword(subscriberUserPassword);
+ }
+
+ final String subscriberContentType = tcaAppPreferences.getSubscriberContentType();
+ if (isPresent(subscriberContentType)) {
+ subscriberConfigBuilder.setContentType(subscriberContentType);
+ }
+
+ final String subscriberConsumerId = tcaAppPreferences.getSubscriberConsumerId();
+ if (isPresent(subscriberConsumerId)) {
+ subscriberConfigBuilder.setConsumerId(subscriberConsumerId);
+ }
+
+ final String subscriberConsumerGroup = tcaAppPreferences.getSubscriberConsumerGroup();
+ if (isPresent(subscriberConsumerGroup)) {
+ subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup);
+ }
+
+ final Integer subscriberTimeoutMS = tcaAppPreferences.getSubscriberTimeoutMS();
+ if (subscriberTimeoutMS != null) {
+ subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS);
+ }
+ final Integer subscriberMessageLimit = tcaAppPreferences.getSubscriberMessageLimit();
+ if (subscriberMessageLimit != null) {
+ subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit);
+ }
+
+ // return Subscriber settings
+ return subscriberConfigBuilder.build();
+
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java index 89c5a84..986c301 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java @@ -1,294 +1,320 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.utils; - -import co.cask.cdap.api.RuntimeContext; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; -import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator; -import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo; -import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig; -import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn; -import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import static com.google.common.collect.Lists.newArrayList; -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH; - -/** - * Utility Helper methods for CDAP TCA sub module. - * - * <p> - * @author Rajiv Singla . Creation Date: 10/24/2016. - */ -public abstract class CDAPTCAUtils extends TCAUtils { - - private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class); - - /** - * Function that extracts alert message string from {@link TCAVESAlertEntity} - */ - public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION = - new Function<TCAVESAlertEntity, String>() { - @Override - public String apply(TCAVESAlertEntity alertEntity) { - return alertEntity == null ? null : alertEntity.getAlertMessage(); - } - }; - - - /** - * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object - * - * @param runtimeContext Runtime Context - * - * @return validated runtime arguments as {@link TCAAppPreferences} object - */ - public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) { - // Parse runtime arguments - final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); - final TCAAppPreferences tcaAppPreferences = - ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class); - - final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration(); - - // populate DMaaP Information from App Config String - populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences); - - // Validate runtime arguments - validateSettings(tcaAppPreferences, new TCAPreferencesValidator()); - - return tcaAppPreferences; - } - - /** - * Populated App Preferences DMaaP Information from Application Config String - * - * @param appConfigString CDAP Application config String - * @param tcaAppPreferences TCA App Preferences - */ - private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString, - final TCAAppPreferences tcaAppPreferences) { - - if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) { - LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config"); - } - - LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString); - - try { - final TCAControllerAppConfig tcaControllerAppConfig = - readValue(appConfigString, TCAControllerAppConfig.class); - - // Parse Subscriber DMaaP information from App Config String - if (tcaControllerAppConfig.getStreamsSubscribes() != null && - tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null && - tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) { - - final DMAAPInfo subscriberDmaapInfo = - tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo(); - LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl()); - final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl()); - tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol()); - tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost()); - final int subscriberUrlPort = subscriberUrl.getPort() != -1 ? - subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol()); - tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort); - tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8)); - - final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn(); - tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName()); - tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword()); - } else { - LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString); - } - - - // Parse Publisher DMaaP information from App Config String - if (tcaControllerAppConfig.getStreamsPublishes() != null && - tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null && - tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) { - - final DMAAPInfo publisherDmaapInfo = - tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo(); - LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl()); - final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl()); - tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol()); - tcaAppPreferences.setPublisherHostName(publisherUrl.getHost()); - final int publisherUrlPort = publisherUrl.getPort() != -1 ? - publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol()); - tcaAppPreferences.setPublisherHostPort(publisherUrlPort); - tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8)); - - final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut(); - tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName()); - tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword()); - } else { - LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString); - } - - - } catch (IOException e) { - throw new CDAPSettingsException( - "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e); - } - } - - /** - * Parses provided DMaaP MR URL string to {@link URL} object - * - * @param urlString url string - * - * @return url object - */ - private static URL parseURL(final String urlString) { - try { - return new URL(urlString); - } catch (MalformedURLException e) { - final String errorMessage = String.format("Invalid URL format: %s", urlString); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - - /** - * Sets up default DMaaP Port if not provided with DMaaP URL - * - * @param protocol protocol e.g. http or https - * - * @return default DMaaP MR port number - */ - private static int getDefaultDMaaPPort(final String protocol) { - if ("http".equals(protocol)) { - return 3904; - } else if ("https".equals(protocol)) { - return 3905; - } else { - return 80; - } - } - - - /** - * Extracts alert message strings from {@link TCAVESAlertEntity} - * - * @param alertEntities collection of alert entities - * - * @return List of alert message strings - */ - public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) { - return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION); - } - - - /** - * Converts Runtime Arguments to {@link TCAPolicyPreferences} object - * - * @param runtimeContext CDAP Runtime Arguments - * - * @return TCA Policy Preferences - */ - public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) { - - final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); - final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments); - - LOG.debug("Printing all Received Runtime Arguments:"); - for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) { - LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue()); - } - - TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences(); - - final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY); - - if (StringUtils.isNotBlank(tcaPolicyJsonString)) { - - LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}", - AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString); - - // initialize unquotedTCAPolicy - String unquotedTCAPolicy = tcaPolicyJsonString.trim(); - - //remove starting and ending quote from passed tca policy Json string if present - if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && - tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) { - unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1); - } - - try { - tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class); - } catch (IOException e) { - throw new CDAPSettingsException( - "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e); - } - - } else { // classical controller is being used. Validate preferences as received from classical controller - - LOG.info("TcaPolicy is being parsed as key value pair from classical controller"); - - // extract TCA Policy Domain from Runtime Arguments - final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH); - - // create new TCA Policy object - tcaPolicyPreferences.setDomain(policyDomain); - - // filter out other non relevant fields which are not related to tca policy - final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments, - TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH); - - // determine functional Roles - final Map<String, Map<String, String>> functionalRolesMap = - extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); - - // create metrics per functional role list - tcaPolicyPreferences.setMetricsPerEventName( - createTCAPolicyMetricsPerEventNameList(functionalRolesMap)); - - } - - // validate tca Policy Preferences - validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator()); - - LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences); - - return tcaPolicyPreferences; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.utils;
+
+import co.cask.cdap.api.RuntimeContext;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig;
+import org.openecomp.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfigBuilder;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator;
+import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings;
+import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH;
+
+/**
+ * Utility Helper methods for CDAP TCA sub module.
+ *
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+public abstract class CDAPTCAUtils extends TCAUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class);
+
+ /**
+ * Function that extracts alert message string from {@link TCAVESAlertEntity}
+ */
+ public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION =
+ new Function<TCAVESAlertEntity, String>() {
+ @Override
+ public String apply(TCAVESAlertEntity alertEntity) {
+ return alertEntity == null ? null : alertEntity.getAlertMessage();
+ }
+ };
+
+
+ /**
+ * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object
+ *
+ * @param runtimeContext Runtime Context
+ *
+ * @return validated runtime arguments as {@link TCAAppPreferences} object
+ */
+ public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) {
+ // Parse runtime arguments
+ final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();
+ final TCAAppPreferences tcaAppPreferences =
+ ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class);
+
+ final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration();
+
+ // populate DMaaP Information from App Config String
+ populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences);
+
+ // Validate runtime arguments
+ validateSettings(tcaAppPreferences, new TCAPreferencesValidator());
+
+ return tcaAppPreferences;
+ }
+
+ /**
+ * Creates an A&AI Http Client config from give {@link TCAAppPreferences}
+ *
+ * @param tcaAppPreferences TCA App Preferences
+ *
+ * @return A&AI Http Client config
+ */
+ public static AAIHttpClientConfig createAAIEnrichmentClientConfig(final TCAAppPreferences tcaAppPreferences) {
+ final String aaiEnrichmentProxyURLString = tcaAppPreferences.getAaiEnrichmentProxyURL();
+ URL aaiEnrichmentProxyURL = null;
+ if (StringUtils.isNotBlank(aaiEnrichmentProxyURLString)) {
+ aaiEnrichmentProxyURL = parseURL(aaiEnrichmentProxyURLString);
+ }
+
+ return new AAIHttpClientConfigBuilder(tcaAppPreferences.getAaiEnrichmentHost())
+ .setAaiProtocol(tcaAppPreferences.getAaiEnrichmentProtocol())
+ .setAaiHostPortNumber(tcaAppPreferences.getAaiEnrichmentPortNumber())
+ .setAaiUserName(tcaAppPreferences.getAaiEnrichmentUserName())
+ .setAaiUserPassword(tcaAppPreferences.getAaiEnrichmentUserPassword())
+ .setAaiProxyURL(aaiEnrichmentProxyURL)
+ .setAaiIgnoreSSLCertificateErrors(tcaAppPreferences.getAaiEnrichmentIgnoreSSLCertificateErrors())
+ .build();
+ }
+
+ /**
+ * Populated App Preferences DMaaP Information from Application Config String
+ *
+ * @param appConfigString CDAP Application config String
+ * @param tcaAppPreferences TCA App Preferences
+ */
+ private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,
+ final TCAAppPreferences tcaAppPreferences) {
+
+ if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {
+ LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");
+ }
+
+ LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);
+
+ try {
+ final TCAControllerAppConfig tcaControllerAppConfig =
+ readValue(appConfigString, TCAControllerAppConfig.class);
+
+ // Parse Subscriber DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsSubscribes() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {
+
+ final DMAAPInfo subscriberDmaapInfo =
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();
+ LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());
+ final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());
+ final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?
+ subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);
+ tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));
+
+ final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();
+ tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());
+ tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ // Parse Publisher DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsPublishes() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {
+
+ final DMAAPInfo publisherDmaapInfo =
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();
+ LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());
+ final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());
+ final int publisherUrlPort = publisherUrl.getPort() != -1 ?
+ publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostPort(publisherUrlPort);
+ tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));
+
+ final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();
+ tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());
+ tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);
+ }
+ }
+
+ /**
+ * Parses provided DMaaP MR URL string to {@link URL} object
+ *
+ * @param urlString url string
+ *
+ * @return url object
+ */
+ private static URL parseURL(final String urlString) {
+ try {
+ return new URL(urlString);
+ } catch (MalformedURLException e) {
+ final String errorMessage = String.format("Invalid URL format: %s", urlString);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ /**
+ * Sets up default DMaaP Port if not provided with DMaaP URL
+ *
+ * @param protocol protocol e.g. http or https
+ *
+ * @return default DMaaP MR port number
+ */
+ private static int getDefaultDMaaPPort(final String protocol) {
+ if ("http".equals(protocol)) {
+ return 3904;
+ } else if ("https".equals(protocol)) {
+ return 3905;
+ } else {
+ return 80;
+ }
+ }
+
+
+ /**
+ * Extracts alert message strings from {@link TCAVESAlertEntity}
+ *
+ * @param alertEntities collection of alert entities
+ *
+ * @return List of alert message strings
+ */
+ public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) {
+ return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION);
+ }
+
+
+ /**
+ * Converts Runtime Arguments to {@link TCAPolicyPreferences} object
+ *
+ * @param runtimeContext CDAP Runtime Arguments
+ *
+ * @return TCA Policy Preferences
+ */
+ public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) {
+
+ final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();
+ final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments);
+
+ LOG.debug("Printing all Received Runtime Arguments:");
+ for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) {
+ LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue());
+ }
+
+ TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences();
+
+ final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
+
+ if (StringUtils.isNotBlank(tcaPolicyJsonString)) {
+
+ LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}",
+ AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString);
+
+ // initialize unquotedTCAPolicy
+ String unquotedTCAPolicy = tcaPolicyJsonString.trim();
+
+ //remove starting and ending quote from passed tca policy Json string if present
+ if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) &&
+ tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
+ unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1);
+ }
+
+ try {
+ tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class);
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e);
+ }
+
+ } else { // classical controller is being used. Validate preferences as received from classical controller
+
+ LOG.info("TcaPolicy is being parsed as key value pair from classical controller");
+
+ // extract TCA Policy Domain from Runtime Arguments
+ final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH);
+
+ // create new TCA Policy object
+ tcaPolicyPreferences.setDomain(policyDomain);
+
+ // filter out other non relevant fields which are not related to tca policy
+ final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments,
+ TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH);
+
+ // determine functional Roles
+ final Map<String, Map<String, String>> functionalRolesMap =
+ extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER);
+
+ // create metrics per functional role list
+ tcaPolicyPreferences.setMetricsPerEventName(
+ createTCAPolicyMetricsPerEventNameList(functionalRolesMap));
+
+ }
+
+ // validate tca Policy Preferences
+ validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator());
+
+ LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences);
+
+ return tcaPolicyPreferences;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java index 23e4c8a..fe44c1a 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java @@ -1,62 +1,62 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.validator; - -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; - -/** - * <p> - * TCA App Config Validator validates any TCA App Config parameter values - * </p> - * - * @author Rajiv Singla . Creation Date: 10/24/2016. - */ -public class TCAAppConfigValidator implements CDAPAppSettingsValidator<TCAAppConfig, - GenericValidationResponse<TCAAppConfig>> { - - private static final long serialVersionUID = 1L; - - @Override - public GenericValidationResponse<TCAAppConfig> validateAppSettings(TCAAppConfig tcaAppConfig) { - - final GenericValidationResponse<TCAAppConfig> validationResponse = new GenericValidationResponse<>(); - - if (isEmpty(tcaAppConfig.getTcaSubscriberOutputStreamName())) { - validationResponse.addErrorMessage("tcaSubscriberOutputStreamName", - "tcaSubscriberOutputStreamName must be present"); - } - - if (isEmpty(tcaAppConfig.getTcaVESMessageStatusTableName())) { - validationResponse.addErrorMessage("tcaVESMessageStatusTableName", - "tcaVESMessageStatusTableName must be present"); - } - if (isEmpty(tcaAppConfig.getTcaVESAlertsTableName())) { - validationResponse.addErrorMessage("tcaVESAlertsTableName", - "tcaVESAlertsTableName must be present"); - } - - return validationResponse; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+
+/**
+ * <p>
+ * TCA App Config Validator validates any TCA App Config parameter values
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+public class TCAAppConfigValidator implements CDAPAppSettingsValidator<TCAAppConfig,
+ GenericValidationResponse<TCAAppConfig>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public GenericValidationResponse<TCAAppConfig> validateAppSettings(TCAAppConfig tcaAppConfig) {
+
+ final GenericValidationResponse<TCAAppConfig> validationResponse = new GenericValidationResponse<>();
+
+ if (isEmpty(tcaAppConfig.getTcaSubscriberOutputStreamName())) {
+ validationResponse.addErrorMessage("tcaSubscriberOutputStreamName",
+ "tcaSubscriberOutputStreamName must be present");
+ }
+
+ if (isEmpty(tcaAppConfig.getTcaVESMessageStatusTableName())) {
+ validationResponse.addErrorMessage("tcaVESMessageStatusTableName",
+ "tcaVESMessageStatusTableName must be present");
+ }
+ if (isEmpty(tcaAppConfig.getTcaVESAlertsTableName())) {
+ validationResponse.addErrorMessage("tcaVESAlertsTableName",
+ "tcaVESAlertsTableName must be present");
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java index ceb1857..7b5c9cf 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java @@ -1,115 +1,115 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.validator; - -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; - -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; - -/** - * Validates TCA Policy Preferences - * <p> - * - * @author Rajiv Singla . Creation Date: 11/29/2016. - */ -public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<TCAPolicyPreferences, - GenericValidationResponse<TCAPolicyPreferences>> { - - private static final long serialVersionUID = 1L; - - @Override - public GenericValidationResponse<TCAPolicyPreferences> validateAppSettings( - final TCAPolicyPreferences tcaPolicyPreferences) { - - final GenericValidationResponse<TCAPolicyPreferences> validationResponse = new GenericValidationResponse<>(); - - // validate TCA Policy must domain present - final String domain = tcaPolicyPreferences.getDomain(); - if (isEmpty(domain)) { - validationResponse.addErrorMessage("domain", "TCA Policy must have only one domain present"); - } - - // validate TCA Policy must have at least one event name - final List<String> policyEventNames = TCAUtils.getPolicyEventNames(tcaPolicyPreferences); - if (policyEventNames.isEmpty()) { - validationResponse.addErrorMessage("metricsPerEventNames", - "TCA Policy must have at least one or more event names"); - } - - final List<MetricsPerEventName> metricsPerEventNames = - tcaPolicyPreferences.getMetricsPerEventName(); - - // validate Metrics Per Event Name - for (MetricsPerEventName metricsPerEventName : metricsPerEventNames) { - - // event name must be present - final String eventName = metricsPerEventName.getEventName(); - if (isEmpty(eventName)) { - validationResponse.addErrorMessage("eventName", - "TCA Policy eventName is not present for metricsPerEventName:" + metricsPerEventName); - } - - // control Loop Schema type must be present - final ControlLoopSchemaType controlLoopSchemaType = metricsPerEventName.getControlLoopSchemaType(); - if (controlLoopSchemaType == null) { - validationResponse.addErrorMessage("controlLoopEventType", - "TCA Policy controlLoopSchemaType is not present for metricsPerEventName:" - + metricsPerEventName); - } - - // must have at least 1 threshold defined - if (metricsPerEventName.getThresholds() == null || metricsPerEventName.getThresholds().isEmpty()) { - validationResponse.addErrorMessage("thresholds", - "TCA Policy event Name must have at least one threshold. " + - "Event Name causing this validation error:" + metricsPerEventName); - } else { - // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity - final List<Threshold> eventNameThresholds = metricsPerEventName.getThresholds(); - for (Threshold eventNameThreshold : eventNameThresholds) { - final String fieldPath = eventNameThreshold.getFieldPath(); - final Long thresholdValue = eventNameThreshold.getThresholdValue(); - final Direction direction = eventNameThreshold.getDirection(); - final EventSeverity severity = eventNameThreshold.getSeverity(); - final ControlLoopEventStatus closedLoopEventStatus = eventNameThreshold.getClosedLoopEventStatus(); - if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null || - closedLoopEventStatus == null) { - validationResponse.addErrorMessage("threshold", - "TCA Policy threshold must have fieldPath,thresholdValue,direction, " + - "closedLoopEventStatus and severity defined." + - "Threshold causing this validation error:" + eventNameThreshold); - } - } - } - } - return validationResponse; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+
+/**
+ * Validates TCA Policy Preferences
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/29/2016.
+ */
+public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<TCAPolicyPreferences,
+ GenericValidationResponse<TCAPolicyPreferences>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public GenericValidationResponse<TCAPolicyPreferences> validateAppSettings(
+ final TCAPolicyPreferences tcaPolicyPreferences) {
+
+ final GenericValidationResponse<TCAPolicyPreferences> validationResponse = new GenericValidationResponse<>();
+
+ // validate TCA Policy must domain present
+ final String domain = tcaPolicyPreferences.getDomain();
+ if (isEmpty(domain)) {
+ validationResponse.addErrorMessage("domain", "TCA Policy must have only one domain present");
+ }
+
+ // validate TCA Policy must have at least one event name
+ final List<String> policyEventNames = TCAUtils.getPolicyEventNames(tcaPolicyPreferences);
+ if (policyEventNames.isEmpty()) {
+ validationResponse.addErrorMessage("metricsPerEventNames",
+ "TCA Policy must have at least one or more event names");
+ }
+
+ final List<MetricsPerEventName> metricsPerEventNames =
+ tcaPolicyPreferences.getMetricsPerEventName();
+
+ // validate Metrics Per Event Name
+ for (MetricsPerEventName metricsPerEventName : metricsPerEventNames) {
+
+ // event name must be present
+ final String eventName = metricsPerEventName.getEventName();
+ if (isEmpty(eventName)) {
+ validationResponse.addErrorMessage("eventName",
+ "TCA Policy eventName is not present for metricsPerEventName:" + metricsPerEventName);
+ }
+
+ // control Loop Schema type must be present
+ final ControlLoopSchemaType controlLoopSchemaType = metricsPerEventName.getControlLoopSchemaType();
+ if (controlLoopSchemaType == null) {
+ validationResponse.addErrorMessage("controlLoopEventType",
+ "TCA Policy controlLoopSchemaType is not present for metricsPerEventName:"
+ + metricsPerEventName);
+ }
+
+ // must have at least 1 threshold defined
+ if (metricsPerEventName.getThresholds() == null || metricsPerEventName.getThresholds().isEmpty()) {
+ validationResponse.addErrorMessage("thresholds",
+ "TCA Policy event Name must have at least one threshold. " +
+ "Event Name causing this validation error:" + metricsPerEventName);
+ } else {
+ // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity
+ final List<Threshold> eventNameThresholds = metricsPerEventName.getThresholds();
+ for (Threshold eventNameThreshold : eventNameThresholds) {
+ final String fieldPath = eventNameThreshold.getFieldPath();
+ final Long thresholdValue = eventNameThreshold.getThresholdValue();
+ final Direction direction = eventNameThreshold.getDirection();
+ final EventSeverity severity = eventNameThreshold.getSeverity();
+ final ClosedLoopEventStatus closedLoopEventStatus = eventNameThreshold.getClosedLoopEventStatus();
+ if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null ||
+ closedLoopEventStatus == null) {
+ validationResponse.addErrorMessage("threshold",
+ "TCA Policy threshold must have fieldPath,thresholdValue,direction, " +
+ "closedLoopEventStatus and severity defined." +
+ "Threshold causing this validation error:" + eventNameThreshold);
+ }
+ }
+ }
+ }
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java index c74463b..498ca85 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java @@ -1,65 +1,84 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.validator; - -import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; - -import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; - -/** - * - * @author Rajiv Singla . Creation Date: 11/3/2016. - */ -public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppPreferences, - GenericValidationResponse<TCAAppPreferences>> { - - private static final long serialVersionUID = 1L; - - @Override - public GenericValidationResponse<TCAAppPreferences> validateAppSettings(TCAAppPreferences appPreferences) { - - final GenericValidationResponse<TCAAppPreferences> validationResponse = new GenericValidationResponse<>(); - - // subscriber validations - final String subscriberHostName = appPreferences.getSubscriberHostName(); - if (isEmpty(subscriberHostName)) { - validationResponse.addErrorMessage("subscriberHostName", "Subscriber host name must be present"); - } - final String subscriberTopicName = appPreferences.getSubscriberTopicName(); - if (isEmpty(subscriberTopicName)) { - validationResponse.addErrorMessage("subscriberTopicName", "Subscriber topic name must be present"); - } - - // publisher validations - final String publisherHostName = appPreferences.getPublisherHostName(); - if (isEmpty(publisherHostName)) { - validationResponse.addErrorMessage("publisherHostName", "Publisher host name must be present"); - } - final String publisherTopicName = appPreferences.getPublisherTopicName(); - if (isEmpty(publisherTopicName)) { - validationResponse.addErrorMessage("publisherTopicName", "Publisher topic name must be present"); - } - - return validationResponse; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.validator;
+
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+
+/**
+ *
+ * @author Rajiv Singla . Creation Date: 11/3/2016.
+ */
+public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppPreferences,
+ GenericValidationResponse<TCAAppPreferences>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public GenericValidationResponse<TCAAppPreferences> validateAppSettings(TCAAppPreferences appPreferences) {
+
+ final GenericValidationResponse<TCAAppPreferences> validationResponse = new GenericValidationResponse<>();
+
+ // subscriber validations
+ final String subscriberHostName = appPreferences.getSubscriberHostName();
+ if (isEmpty(subscriberHostName)) {
+ validationResponse.addErrorMessage("subscriberHostName", "Subscriber host name must be present");
+ }
+ final String subscriberTopicName = appPreferences.getSubscriberTopicName();
+ if (isEmpty(subscriberTopicName)) {
+ validationResponse.addErrorMessage("subscriberTopicName", "Subscriber topic name must be present");
+ }
+
+ // publisher validations
+ final String publisherHostName = appPreferences.getPublisherHostName();
+ if (isEmpty(publisherHostName)) {
+ validationResponse.addErrorMessage("publisherHostName", "Publisher host name must be present");
+ }
+ final String publisherTopicName = appPreferences.getPublisherTopicName();
+ if (isEmpty(publisherTopicName)) {
+ validationResponse.addErrorMessage("publisherTopicName", "Publisher topic name must be present");
+ }
+
+ final Boolean enableAAIEnrichment = appPreferences.getEnableAAIEnrichment();
+
+ // if aai enrichment is enabled then do some aai validations
+ if (enableAAIEnrichment) {
+ final String aaiEnrichmentHost = appPreferences.getAaiEnrichmentHost();
+ if (isEmpty(aaiEnrichmentHost)) {
+ validationResponse.addErrorMessage("aaiEnrichmentHost", "AAI Enrichment Host must be present");
+ }
+ final String aaiVMEnrichmentAPIPath = appPreferences.getAaiVMEnrichmentAPIPath();
+ if (isEmpty(aaiVMEnrichmentAPIPath)) {
+ validationResponse.addErrorMessage("aaiVMEnrichmentAPIPath", "AAI VM Enrichment path must be present");
+ }
+ final String aaiVNFEnrichmentAPIPath = appPreferences.getAaiVNFEnrichmentAPIPath();
+ if (isEmpty(aaiVNFEnrichmentAPIPath)) {
+ validationResponse.addErrorMessage("aaiVNFEnrichmentAPIPath", "AAI VNF Enrichment path must be " +
+ "present");
+ }
+ }
+
+ return validationResponse;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java index 6623321..348f392 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java @@ -1,116 +1,116 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.worker.AbstractWorker; -import com.google.common.base.Preconditions; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; - -import static java.lang.String.format; - -/** - * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals - * <p> - * @author Rajiv Singla . Creation Date: 12/19/2016. - */ -public abstract class BaseTCADMaaPMRWorker extends AbstractWorker { - - private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class); - - /** - * Quartz Scheduler - */ - protected Scheduler scheduler; - /** - * Determines if scheduler is shutdown - */ - protected AtomicBoolean isSchedulerShutdown; - - - @Override - public void run() { - - Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); - String schedulerName = ""; - - // Start scheduler - try { - schedulerName = scheduler.getSchedulerName(); - scheduler.start(); - isSchedulerShutdown.getAndSet(false); - - } catch (SchedulerException e) { - final String errorMessage = - format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName); - - // indefinite loop which wakes up and confirms scheduler is indeed running - while (!isSchedulerShutdown.get()) { - try { - - Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS); - - } catch (InterruptedException e) { - - final String errorMessage = - format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s", - schedulerName, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - - LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName); - - } - - @Override - public void stop() { - - Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); - String schedulerName = ""; - - // Stop Scheduler - try { - schedulerName = scheduler.getSchedulerName(); - LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName); - scheduler.shutdown(); - isSchedulerShutdown.getAndSet(true); - - } catch (SchedulerException e) { - - final String errorMessage = - format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.worker.AbstractWorker;
+import com.google.common.base.Preconditions;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+
+/**
+ * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals
+ * <p>
+ * @author Rajiv Singla . Creation Date: 12/19/2016.
+ */
+public abstract class BaseTCADMaaPMRWorker extends AbstractWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class);
+
+ /**
+ * Quartz Scheduler
+ */
+ protected Scheduler scheduler;
+ /**
+ * Determines if scheduler is shutdown
+ */
+ protected AtomicBoolean isSchedulerShutdown;
+
+
+ @Override
+ public void run() {
+
+ Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
+ String schedulerName = "";
+
+ // Start scheduler
+ try {
+ schedulerName = scheduler.getSchedulerName();
+ scheduler.start();
+ isSchedulerShutdown.getAndSet(false);
+
+ } catch (SchedulerException e) {
+ final String errorMessage =
+ format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName);
+
+ // indefinite loop which wakes up and confirms scheduler is indeed running
+ while (!isSchedulerShutdown.get()) {
+ try {
+
+ Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS);
+
+ } catch (InterruptedException e) {
+
+ final String errorMessage =
+ format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s",
+ schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName);
+
+ }
+
+ @Override
+ public void stop() {
+
+ Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
+ String schedulerName = "";
+
+ // Stop Scheduler
+ try {
+ schedulerName = scheduler.getSchedulerName();
+ LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName);
+ scheduler.shutdown();
+ isSchedulerShutdown.getAndSet(true);
+
+ } catch (SchedulerException e) {
+
+ final String errorMessage =
+ format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java index ce4ccbe..7c8e3c6 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java @@ -1,200 +1,200 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.TxRunnable; -import co.cask.cdap.api.common.Bytes; -import co.cask.cdap.api.data.DatasetContext; -import co.cask.cdap.api.dataset.lib.CloseableIterator; -import co.cask.cdap.api.dataset.lib.KeyValue; -import co.cask.cdap.api.dataset.lib.ObjectMappedTable; -import co.cask.cdap.api.metrics.Metrics; -import co.cask.cdap.api.worker.WorkerContext; -import com.google.common.base.Joiner; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.apache.tephra.TransactionFailureException; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; -import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; -import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.PersistJobDataAfterExecution; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME; -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME; - -/** - * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to - * DMaaP MR topic - *<p> - * @author Rajiv Singla . Creation Date: 11/17/2016. - */ -@DisallowConcurrentExecution -@PersistJobDataAfterExecution -@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") -public class TCADMaaPMRPublisherJob implements Job { - - private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class); - - @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - - LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}", - jobExecutionContext.getNextFireTime()); - - // Get Job Data Map - final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); - - // Fetch all Job Params from Job Data Map - final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME); - final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME); - final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME); - final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME); - - LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName); - - // Get new alerts from alerts table - final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext); - - // If no new alerts are found - nothing to publish - if (newAlertsMap.isEmpty()) { - LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName); - metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1); - return; - } - - final int newAlertsCount = newAlertsMap.size(); - LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName, - newAlertsCount); - metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount); - - // Get alert message strings from alert Entities - final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values()); - - // Publish messages to DMaaP MR Topic - try { - - final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages); - - final Integer responseCode = publisherResponse.getResponseCode(); - final String responseMessage = publisherResponse.getResponseMessage(); - final int pendingMessagesCount = publisherResponse.getPendingMessagesCount(); - - LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode, - responseMessage, pendingMessagesCount); - - if (HTTPUtils.isSuccessfulResponseCode(responseCode)) { - LOG.debug("Successfully Published alerts to DMaaP MR Topic."); - metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); - } else { - LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later...."); - metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); - } - - } catch (DCAEAnalyticsRuntimeException e) { - LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e); - } finally { - // delete send message from alerts table - deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics); - } - - LOG.debug("Finished DMaaP MR Topic Publisher fetch Job."); - - } - - /** - * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values - * - * @param cdapAlertsTableName alerts table name - * @param workerContext worker context - * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values - */ - protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName, - final WorkerContext workerContext) { - final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>(); - try { - workerContext.execute(new TxRunnable() { - @Override - public void run(DatasetContext context) throws Exception { - final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); - final Date currentTime = new Date(); - final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime); - final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey); - while (scan.hasNext()) { - final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next(); - newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue()); - } - } - }); - } catch (TransactionFailureException e) { - final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString(); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - return newAlertsMap; - } - - /** - * Deletes rows in Alerts table for give rowKeys - * - * @param cdapAlertsTableName CDAP Alerts Table Name - * @param workerContext Worker Context - * @param rowKeys Row Key Set - * @param metrics CDAP metrics - */ - protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext, - final Set<String> rowKeys, final Metrics metrics) { - LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys)); - try { - workerContext.execute(new TxRunnable() { - @Override - public void run(DatasetContext context) throws Exception { - final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); - for (String rowKey : rowKeys) { - alertsTable.delete(rowKey); - metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1); - } - } - }); - } catch (TransactionFailureException e) { - final String errorMessage = - "Transaction Error while deleting published alerts in alerts table: " + e.toString(); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.TxRunnable;
+import co.cask.cdap.api.common.Bytes;
+import co.cask.cdap.api.data.DatasetContext;
+import co.cask.cdap.api.dataset.lib.CloseableIterator;
+import co.cask.cdap.api.dataset.lib.KeyValue;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.google.common.base.Joiner;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.tephra.TransactionFailureException;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME;
+import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME;
+import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME;
+import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME;
+
+/**
+ * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to
+ * DMaaP MR topic
+ *<p>
+ * @author Rajiv Singla . Creation Date: 11/17/2016.
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
+public class TCADMaaPMRPublisherJob implements Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class);
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}",
+ jobExecutionContext.getNextFireTime());
+
+ // Get Job Data Map
+ final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
+
+ // Fetch all Job Params from Job Data Map
+ final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME);
+ final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME);
+ final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME);
+ final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME);
+
+ LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName);
+
+ // Get new alerts from alerts table
+ final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext);
+
+ // If no new alerts are found - nothing to publish
+ if (newAlertsMap.isEmpty()) {
+ LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1);
+ return;
+ }
+
+ final int newAlertsCount = newAlertsMap.size();
+ LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName,
+ newAlertsCount);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount);
+
+ // Get alert message strings from alert Entities
+ final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values());
+
+ // Publish messages to DMaaP MR Topic
+ try {
+
+ final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages);
+
+ final Integer responseCode = publisherResponse.getResponseCode();
+ final String responseMessage = publisherResponse.getResponseMessage();
+ final int pendingMessagesCount = publisherResponse.getPendingMessagesCount();
+
+ LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode,
+ responseMessage, pendingMessagesCount);
+
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("Successfully Published alerts to DMaaP MR Topic.");
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
+ } else {
+ LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later....");
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
+ }
+
+ } catch (DCAEAnalyticsRuntimeException e) {
+ LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e);
+ } finally {
+ // delete send message from alerts table
+ deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics);
+ }
+
+ LOG.debug("Finished DMaaP MR Topic Publisher fetch Job.");
+
+ }
+
+ /**
+ * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values
+ *
+ * @param cdapAlertsTableName alerts table name
+ * @param workerContext worker context
+ * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values
+ */
+ protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName,
+ final WorkerContext workerContext) {
+ final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>();
+ try {
+ workerContext.execute(new TxRunnable() {
+ @Override
+ public void run(DatasetContext context) throws Exception {
+ final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
+ final Date currentTime = new Date();
+ final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime);
+ final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey);
+ while (scan.hasNext()) {
+ final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next();
+ newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue());
+ }
+ }
+ });
+ } catch (TransactionFailureException e) {
+ final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ return newAlertsMap;
+ }
+
+ /**
+ * Deletes rows in Alerts table for give rowKeys
+ *
+ * @param cdapAlertsTableName CDAP Alerts Table Name
+ * @param workerContext Worker Context
+ * @param rowKeys Row Key Set
+ * @param metrics CDAP metrics
+ */
+ protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext,
+ final Set<String> rowKeys, final Metrics metrics) {
+ LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys));
+ try {
+ workerContext.execute(new TxRunnable() {
+ @Override
+ public void run(DatasetContext context) throws Exception {
+ final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
+ for (String rowKey : rowKeys) {
+ alertsTable.delete(rowKey);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1);
+ }
+ }
+ });
+ } catch (TransactionFailureException e) {
+ final String errorMessage =
+ "Transaction Error while deleting published alerts in alerts table: " + e.toString();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java index 9fb9d83..d21be2d 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java @@ -1,114 +1,114 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.metrics.Metrics; -import co.cask.cdap.api.worker.WorkerContext; -import com.google.common.base.Optional; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; -import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.PersistJobDataAfterExecution; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -import static java.lang.String.format; - -/** - * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to - * a given CDAP Stream - * - * @author Rajiv Singla . Creation Date: 10/24/2016. - */ -@DisallowConcurrentExecution -@PersistJobDataAfterExecution -public class TCADMaaPMRSubscriberJob implements Job { - - private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class); - - @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - - LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}", - jobExecutionContext.getNextFireTime()); - - // Get Job Data Map - final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); - - // Fetch all Job Params from Job Data Map - final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME); - final WorkerContext workerContext = - (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME); - final DMaaPMRSubscriber subscriber = - (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME); - final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME); - - final Optional<List<String>> subscriberMessagesOptional = - DMaaPMRUtils.getSubscriberMessages(subscriber, metrics); - - // Write message to CDAP Stream using Stream Writer - if (subscriberMessagesOptional.isPresent()) { - writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics); - } - } - - - /** - * Writes given messages to CDAP Stream - * - * @param actualMessages List of messages that need to written to cdap stream - * @param cdapStreamName cdap stream name - * @param workerContext cdap worker context - * @param metrics cdap metrics - */ - private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName, - final WorkerContext workerContext, final Metrics metrics) { - LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size()); - try { - - for (String message : actualMessages) { - workerContext.write(cdapStreamName, message); - } - - } catch (IOException e) { - metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1); - final String errorMessage = - format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " + - "Exception: %s", cdapStreamName, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}", - cdapStreamName, actualMessages.size()); - - } - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.google.common.base.Optional;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to
+ * a given CDAP Stream
+ *
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+public class TCADMaaPMRSubscriberJob implements Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class);
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}",
+ jobExecutionContext.getNextFireTime());
+
+ // Get Job Data Map
+ final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
+
+ // Fetch all Job Params from Job Data Map
+ final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME);
+ final WorkerContext workerContext =
+ (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME);
+ final DMaaPMRSubscriber subscriber =
+ (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME);
+ final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME);
+
+ final Optional<List<String>> subscriberMessagesOptional =
+ DMaaPMRUtils.getSubscriberMessages(subscriber, metrics);
+
+ // Write message to CDAP Stream using Stream Writer
+ if (subscriberMessagesOptional.isPresent()) {
+ writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics);
+ }
+ }
+
+
+ /**
+ * Writes given messages to CDAP Stream
+ *
+ * @param actualMessages List of messages that need to written to cdap stream
+ * @param cdapStreamName cdap stream name
+ * @param workerContext cdap worker context
+ * @param metrics cdap metrics
+ */
+ private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName,
+ final WorkerContext workerContext, final Metrics metrics) {
+ LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size());
+ try {
+
+ for (String message : actualMessages) {
+ workerContext.write(cdapStreamName, message);
+ }
+
+ } catch (IOException e) {
+ metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1);
+ final String errorMessage =
+ format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " +
+ "Exception: %s", cdapStreamName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}",
+ cdapStreamName, actualMessages.size());
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java index 4d721e2..12a52f2 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java @@ -1,141 +1,141 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.worker.AbstractWorker; -import co.cask.cdap.api.worker.WorkerContext; -import com.fasterxml.jackson.core.type.TypeReference; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue; -import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString; - -/** - * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic. - * The mock instead of making DMaaP MR calls will actually take messages - * from file and send them to stream at subscriber polling interval - * - * TODO: To be removed before going to production - only for testing purposes - * - * @author Rajiv Singla . Creation Date: 11/4/2016. - */ -public class TCADMaaPMockSubscriberWorker extends AbstractWorker { - - private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class); - - // TODO: Remove this file before going to production - only for mocking purposes - private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json"; - private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE = - new TypeReference<List<EventListener>>() { - }; - - private TCAAppPreferences tcaAppPreferences; - private boolean stopSendingMessages; - @Property - private final String tcaSubscriberOutputStreamName; - - public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) { - this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; - } - - @Override - public void configure() { - setName("MockTCASubscriberWorker"); - setDescription("Writes Mocked VES messages to CDAP Stream"); - LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker"); - } - - @Override - public void initialize(WorkerContext context) throws Exception { - super.initialize(context); - - final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); - LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences); - this.tcaAppPreferences = appPreferences; - this.stopSendingMessages = false; - } - - - @Override - public void run() { - final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); - LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval); - - final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream - (MOCK_MESSAGE_FILE_LOCATION); - - if (resourceAsStream == null) { - LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION); - throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException()); - } - - - try { - List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE); - - final int totalMessageCount = eventListeners.size(); - LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount); - - int i = 1; - for (EventListener eventListener : eventListeners) { - if (stopSendingMessages) { - LOG.debug("Stop sending messages......"); - break; - } - final String eventListenerString = writeValueAsString(eventListener); - LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount); - getContext().write(tcaSubscriberOutputStreamName, eventListenerString); - i++; - - try { - Thread.sleep(subscriberPollingInterval); - } catch (InterruptedException e) { - LOG.error("Error while sleeping"); - throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e); - } - } - - LOG.debug("Finished writing mock messages to CDAP Stream"); - - } catch (IOException e) { - LOG.error("Error while parsing json file"); - throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e); - } - - - } - - @Override - public void stop() { - stopSendingMessages = true; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.worker.AbstractWorker;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue;
+import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString;
+
+/**
+ * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic.
+ * The mock instead of making DMaaP MR calls will actually take messages
+ * from file and send them to stream at subscriber polling interval
+ *
+ * TODO: To be removed before going to production - only for testing purposes
+ *
+ * @author Rajiv Singla . Creation Date: 11/4/2016.
+ */
+public class TCADMaaPMockSubscriberWorker extends AbstractWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class);
+
+ // TODO: Remove this file before going to production - only for mocking purposes
+ private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
+ private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
+ new TypeReference<List<EventListener>>() {
+ };
+
+ private TCAAppPreferences tcaAppPreferences;
+ private boolean stopSendingMessages;
+ @Property
+ private final String tcaSubscriberOutputStreamName;
+
+ public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) {
+ this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
+ }
+
+ @Override
+ public void configure() {
+ setName("MockTCASubscriberWorker");
+ setDescription("Writes Mocked VES messages to CDAP Stream");
+ LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker");
+ }
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+ LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences);
+ this.tcaAppPreferences = appPreferences;
+ this.stopSendingMessages = false;
+ }
+
+
+ @Override
+ public void run() {
+ final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
+ LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval);
+
+ final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream
+ (MOCK_MESSAGE_FILE_LOCATION);
+
+ if (resourceAsStream == null) {
+ LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
+ throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
+ }
+
+
+ try {
+ List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
+
+ final int totalMessageCount = eventListeners.size();
+ LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
+
+ int i = 1;
+ for (EventListener eventListener : eventListeners) {
+ if (stopSendingMessages) {
+ LOG.debug("Stop sending messages......");
+ break;
+ }
+ final String eventListenerString = writeValueAsString(eventListener);
+ LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
+ getContext().write(tcaSubscriberOutputStreamName, eventListenerString);
+ i++;
+
+ try {
+ Thread.sleep(subscriberPollingInterval);
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping");
+ throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
+ }
+ }
+
+ LOG.debug("Finished writing mock messages to CDAP Stream");
+
+ } catch (IOException e) {
+ LOG.error("Error while parsing json file");
+ throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
+ }
+
+
+ }
+
+ @Override
+ public void stop() {
+ stopSendingMessages = true;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java index 90e458a..42f8c8b 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java @@ -1,146 +1,146 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.metrics.Metrics; -import co.cask.cdap.api.worker.WorkerContext; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToPublisherConfigMapper; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.quartz.JobDataMap; -import org.quartz.SchedulerException; -import org.quartz.impl.StdSchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; - -import static java.lang.String.format; - -/** - * TCA DMaaP Publisher will monitor alerts table at regular intervals and publish any alerts to DMaaP MR Publishing - * Topic - * <p> - * @author Rajiv Singla . Creation Date: 11/16/2016. - */ -public class TCADMaaPPublisherWorker extends BaseTCADMaaPMRWorker { - - private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPPublisherWorker.class); - - private DMaaPMRPublisher publisher; - private Metrics metrics; - @Property - private final String tcaVESAlertsTableName; - - public TCADMaaPPublisherWorker(final String tcaVESAlertsTableName) { - this.tcaVESAlertsTableName = tcaVESAlertsTableName; - } - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); - setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_DESCRIPTION_WORKER); - LOG.debug("Configuring TCA MR DMaaP Publisher worker with name: {}", - CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); - } - - - @Override - public void initialize(WorkerContext context) throws Exception { - super.initialize(context); - - // Parse runtime arguments - final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); - - LOG.info("Initializing TCA MR DMaaP Publisher worker with preferences: {}", tcaAppPreferences); - - // Map TCA App Preferences to DMaaP MR Publisher Config - final DMaaPMRPublisherConfig publisherConfig = AppPreferencesToPublisherConfigMapper.map(tcaAppPreferences); - - LOG.info("TCA DMaaP MR Publisher worker will be polling TCA Alerts Table Name: {}", tcaVESAlertsTableName); - - // Create an instance of DMaaP MR Publisher - LOG.debug("Creating an instance of DMaaP Publisher"); - publisher = DMaaPMRFactory.create().createPublisher(publisherConfig); - - // initialize a new Quartz scheduler - initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); - - // initialize scheduler state - isSchedulerShutdown = new AtomicBoolean(true); - } - - - /** - * Stop DMaaP Publisher - */ - @Override - public void stop() { - // Close Publisher - which will flush any batch messages if present in batch queue - if (publisher != null) { - try { - publisher.close(); - } catch (Exception e) { - final String errorMessage = format("Error while shutting down DMaaP MR Publisher: %s", e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - // Shut down scheduler - super.stop(); - } - - - /** - * Initializes a scheduler instance for DMaaP MR Publisher Job - * - * @throws SchedulerException SchedulerException - */ - private void initializeScheduler(final TCAAppPreferences tcaAnalyticsAppConfig, - final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { - - // Get Publisher polling interval - final Integer publisherPollingInterval = tcaAnalyticsAppConfig.getPublisherPollingInterval(); - - // Publisher Quartz Properties file - final String quartzPublisherPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_PUBLISHER_PROPERTIES_FILE_NAME; - - // Create a new JobDataMap containing information required by TCA DMaaP Publisher Job - final JobDataMap jobDataMap = new JobDataMap(); - jobDataMap.put(AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME, tcaVESAlertsTableName); - jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); - jobDataMap.put(AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME, publisher); - jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); - - // Create new publisher scheduler - scheduler = TCAUtils.createQuartzScheduler(publisherPollingInterval, stdSchedulerFactory, - quartzPublisherPropertiesFileName, jobDataMap, TCADMaaPMRPublisherJob.class, - AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_JOB_NAME, - AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_TRIGGER_NAME); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToPublisherConfigMapper;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.quartz.JobDataMap;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+
+/**
+ * TCA DMaaP Publisher will monitor alerts table at regular intervals and publish any alerts to DMaaP MR Publishing
+ * Topic
+ * <p>
+ * @author Rajiv Singla . Creation Date: 11/16/2016.
+ */
+public class TCADMaaPPublisherWorker extends BaseTCADMaaPMRWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPPublisherWorker.class);
+
+ private DMaaPMRPublisher publisher;
+ private Metrics metrics;
+ @Property
+ private final String tcaVESAlertsTableName;
+
+ public TCADMaaPPublisherWorker(final String tcaVESAlertsTableName) {
+ this.tcaVESAlertsTableName = tcaVESAlertsTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_DESCRIPTION_WORKER);
+ LOG.debug("Configuring TCA MR DMaaP Publisher worker with name: {}",
+ CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER);
+ }
+
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+
+ LOG.info("Initializing TCA MR DMaaP Publisher worker with preferences: {}", tcaAppPreferences);
+
+ // Map TCA App Preferences to DMaaP MR Publisher Config
+ final DMaaPMRPublisherConfig publisherConfig = AppPreferencesToPublisherConfigMapper.map(tcaAppPreferences);
+
+ LOG.info("TCA DMaaP MR Publisher worker will be polling TCA Alerts Table Name: {}", tcaVESAlertsTableName);
+
+ // Create an instance of DMaaP MR Publisher
+ LOG.debug("Creating an instance of DMaaP Publisher");
+ publisher = DMaaPMRFactory.create().createPublisher(publisherConfig);
+
+ // initialize a new Quartz scheduler
+ initializeScheduler(tcaAppPreferences, new StdSchedulerFactory());
+
+ // initialize scheduler state
+ isSchedulerShutdown = new AtomicBoolean(true);
+ }
+
+
+ /**
+ * Stop DMaaP Publisher
+ */
+ @Override
+ public void stop() {
+ // Close Publisher - which will flush any batch messages if present in batch queue
+ if (publisher != null) {
+ try {
+ publisher.close();
+ } catch (Exception e) {
+ final String errorMessage = format("Error while shutting down DMaaP MR Publisher: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+ // Shut down scheduler
+ super.stop();
+ }
+
+
+ /**
+ * Initializes a scheduler instance for DMaaP MR Publisher Job
+ *
+ * @throws SchedulerException SchedulerException
+ */
+ private void initializeScheduler(final TCAAppPreferences tcaAnalyticsAppConfig,
+ final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException {
+
+ // Get Publisher polling interval
+ final Integer publisherPollingInterval = tcaAnalyticsAppConfig.getPublisherPollingInterval();
+
+ // Publisher Quartz Properties file
+ final String quartzPublisherPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_PUBLISHER_PROPERTIES_FILE_NAME;
+
+ // Create a new JobDataMap containing information required by TCA DMaaP Publisher Job
+ final JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME, tcaVESAlertsTableName);
+ jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext());
+ jobDataMap.put(AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME, publisher);
+ jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics);
+
+ // Create new publisher scheduler
+ scheduler = TCAUtils.createQuartzScheduler(publisherPollingInterval, stdSchedulerFactory,
+ quartzPublisherPropertiesFileName, jobDataMap, TCADMaaPMRPublisherJob.class,
+ AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_JOB_NAME,
+ AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_TRIGGER_NAME);
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java index 900d62e..d868ff4 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java @@ -1,124 +1,124 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.cdap.tca.worker; - -import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.metrics.Metrics; -import co.cask.cdap.api.worker.WorkerContext; -import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; -import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToSubscriberConfigMapper; -import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; -import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.quartz.JobDataMap; -import org.quartz.SchedulerException; -import org.quartz.impl.StdSchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * TCA DMaaP Subscriber will read messages and post them to cdap stream at regular intervals - * <p> - * @author Rajiv Singla . Creation Date: 10/14/2016. - */ -public class TCADMaaPSubscriberWorker extends BaseTCADMaaPMRWorker { - - private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPSubscriberWorker.class); - - private DMaaPMRSubscriber subscriber; - private Metrics metrics; - @Property - private final String tcaSubscriberOutputStreamName; - - public TCADMaaPSubscriberWorker(final String tcaSubscriberOutputStreamName) { - this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; - } - - - @Override - public void configure() { - setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); - setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_DESCRIPTION_WORKER); - LOG.debug("Configuring TCA MR DMaaP Subscriber worker with name: {}", - CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); - } - - @Override - public void initialize(WorkerContext context) throws Exception { - super.initialize(context); - - // Parse runtime arguments - final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); - - LOG.info("Initializing TCA MR DMaaP Subscriber worker with preferences: {}", tcaAppPreferences); - - // Map TCA App Preferences to DMaaP MR Subscriber Config - final DMaaPMRSubscriberConfig subscriberConfig = AppPreferencesToSubscriberConfigMapper.map(tcaAppPreferences); - - LOG.info("TCA DMaaP MR Subscriber worker will be writing to CDAP Stream: {}", tcaSubscriberOutputStreamName); - - // Create an instance of DMaaP MR Subscriber - LOG.debug("Creating an instance of DMaaP Subscriber"); - subscriber = DMaaPMRFactory.create().createSubscriber(subscriberConfig); - - // initialize a new Quartz scheduler - initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); - - // initialize scheduler state - isSchedulerShutdown = new AtomicBoolean(true); - } - - /** - * Initializes a scheduler instance for DMaaP MR Subscriber Job - * - * @throws SchedulerException SchedulerException - */ - private void initializeScheduler(final TCAAppPreferences tcaAppPreferences, - final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { - - // Get Subscriber polling interval - final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); - - // Subscriber Quartz Properties file - final String quartzSubscriberPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_SUBSCRIBER_PROPERTIES_FILE_NAME; - - // Create a new JobDataMap containing information required by TCA DMaaP Subscriber Job - final JobDataMap jobDataMap = new JobDataMap(); - jobDataMap.put(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME, tcaSubscriberOutputStreamName); - jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); - jobDataMap.put(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME, subscriber); - jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); - - // Create new publisher scheduler - scheduler = TCAUtils.createQuartzScheduler(subscriberPollingInterval, stdSchedulerFactory, - quartzSubscriberPropertiesFileName, jobDataMap, TCADMaaPMRSubscriberJob.class, - AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_JOB_NAME, - AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_TRIGGER_NAME); - } - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToSubscriberConfigMapper;
+import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.quartz.JobDataMap;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TCA DMaaP Subscriber will read messages and post them to cdap stream at regular intervals
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/14/2016.
+ */
+public class TCADMaaPSubscriberWorker extends BaseTCADMaaPMRWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPSubscriberWorker.class);
+
+ private DMaaPMRSubscriber subscriber;
+ private Metrics metrics;
+ @Property
+ private final String tcaSubscriberOutputStreamName;
+
+ public TCADMaaPSubscriberWorker(final String tcaSubscriberOutputStreamName) {
+ this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
+ }
+
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_DESCRIPTION_WORKER);
+ LOG.debug("Configuring TCA MR DMaaP Subscriber worker with name: {}",
+ CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER);
+ }
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+
+ LOG.info("Initializing TCA MR DMaaP Subscriber worker with preferences: {}", tcaAppPreferences);
+
+ // Map TCA App Preferences to DMaaP MR Subscriber Config
+ final DMaaPMRSubscriberConfig subscriberConfig = AppPreferencesToSubscriberConfigMapper.map(tcaAppPreferences);
+
+ LOG.info("TCA DMaaP MR Subscriber worker will be writing to CDAP Stream: {}", tcaSubscriberOutputStreamName);
+
+ // Create an instance of DMaaP MR Subscriber
+ LOG.debug("Creating an instance of DMaaP Subscriber");
+ subscriber = DMaaPMRFactory.create().createSubscriber(subscriberConfig);
+
+ // initialize a new Quartz scheduler
+ initializeScheduler(tcaAppPreferences, new StdSchedulerFactory());
+
+ // initialize scheduler state
+ isSchedulerShutdown = new AtomicBoolean(true);
+ }
+
+ /**
+ * Initializes a scheduler instance for DMaaP MR Subscriber Job
+ *
+ * @throws SchedulerException SchedulerException
+ */
+ private void initializeScheduler(final TCAAppPreferences tcaAppPreferences,
+ final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException {
+
+ // Get Subscriber polling interval
+ final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
+
+ // Subscriber Quartz Properties file
+ final String quartzSubscriberPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_SUBSCRIBER_PROPERTIES_FILE_NAME;
+
+ // Create a new JobDataMap containing information required by TCA DMaaP Subscriber Job
+ final JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME, tcaSubscriberOutputStreamName);
+ jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext());
+ jobDataMap.put(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME, subscriber);
+ jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics);
+
+ // Create new publisher scheduler
+ scheduler = TCAUtils.createQuartzScheduler(subscriberPollingInterval, stdSchedulerFactory,
+ quartzSubscriberPropertiesFileName, jobDataMap, TCADMaaPMRSubscriberJob.class,
+ AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_JOB_NAME,
+ AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_TRIGGER_NAME);
+ }
+
+
+}
|