From 4edf0fa48892bf38d0e3838d125e9bf2324b38e0 Mon Sep 17 00:00:00 2001 From: an4828 Date: Fri, 15 Sep 2017 15:28:53 -0400 Subject: Add support for ABATED alerts within CDAP TCA Change-Id: Iae560a2d0a47b30b41cd31206dc481a08e4930f7 Signed-off-by: an4828 Issue-ID: DCAEGEN2-107 Signed-off-by: an4828 --- .../cdap/tca/TCAAnalyticsApplication.java | 11 ++ .../cdap/tca/flow/TCAVESCollectorFlow.java | 11 +- .../tca/flowlet/TCAVESAlertsAbatementFlowlet.java | 163 +++++++++++++++++++++ .../cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java | 2 +- .../tca/flowlet/TCAVESMessageRouterFlowlet.java | 6 +- .../TCAVESThresholdViolationCalculatorFlowlet.java | 15 +- .../analytics/cdap/tca/settings/TCAAppConfig.java | 14 ++ .../cdap/tca/settings/TCAAppPreferences.java | 49 +++++++ .../analytics/cdap/tca/utils/CDAPTCAUtils.java | 147 +++++++++++++++++-- .../validator/TCAPolicyPreferencesValidator.java | 77 ++++++---- 10 files changed, 444 insertions(+), 51 deletions(-) create mode 100644 dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java (limited to 'dcae-analytics-cdap-tca/src/main/java') diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java index 1a8cb5e..8c634a7 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java @@ -25,6 +25,7 @@ import co.cask.cdap.api.data.stream.Stream; import co.cask.cdap.api.dataset.DatasetProperties; import co.cask.cdap.api.dataset.lib.ObjectMappedTable; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; @@ -79,6 +80,16 @@ public class TCAAnalyticsApplication extends AbstractApplication { TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds); createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties); + + // Create TCA Alerts Abatement Table + final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName(); + final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds(); + LOG.info("Creating Alerts Abatement Table: {} with TTL: {}", + tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds); + final DatasetProperties alertsAbatementTableProperties = + TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds); + createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties); + // Create TCA VES Alerts Table final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName(); final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds(); diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java index d880a12..4df35e7 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java @@ -22,6 +22,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.flow; import co.cask.cdap.api.flow.AbstractFlow; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet; import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet; import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet; import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet; @@ -53,6 +54,10 @@ public class TCAVESCollectorFlow extends AbstractFlow { new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName()); addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances()); + final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet = + new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName()); + addFlowlet(tcavesAlertsAbatementFlowlet); + final TCAVESAlertsSinkFlowlet alertsSinkFlowlet = new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName()); addFlowlet(alertsSinkFlowlet); @@ -62,8 +67,10 @@ public class TCAVESCollectorFlow extends AbstractFlow { connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet); // connect message router to VES threshold calculator connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet); - // connect VES threshold calculator flowlet to Alerts Sink Flowlet - connect(thresholdViolationCalculatorFlowlet, alertsSinkFlowlet); + // connect VES threshold calculator flowlet to Alerts Abatement Flowlet + connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet); + // connect Alerts Abatement flowlet to Alerts Sink Flowlet + connect(tcavesAlertsAbatementFlowlet, alertsSinkFlowlet); } } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java new file mode 100644 index 0000000..543fc9e --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java @@ -0,0 +1,163 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +/** + * Flowlet responsible to sending out abatement alerts + * + * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017. + */ +public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class); + + @Property + private final String tcaAlertsAbatementTableName; + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) + protected OutputEmitter alertsAbatementOutputEmitter; + + private ObjectMappedTable tcaAlertsAbatementTable; + + public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) { + this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName); + } + + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws Exception { + + final String cefMessage = thresholdCalculatorOutput.getCefMessage(); + final String alertMessageString = thresholdCalculatorOutput.getAlertMessage(); + final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName(); + + // alerts must have violated metrics per event name present + if (StringUtils.isBlank(violatedMetricsPerEventNameString)) { + final String errorMessage = String.format( + "No violated metricsPerEventName found for VES Message: %s." + + "Ignored alert message: %s", cefMessage, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + + final MetricsPerEventName violatedMetricsPerEventName = + TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class); + final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class); + final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); + final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); + final ControlLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus(); + + switch (closedLoopEventStatus) { + + case ONSET: + + LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage); + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + null, tcaAlertsAbatementTable); + LOG.info("Emitting ONSET alert: {}", alertMessageString); + alertsAbatementOutputEmitter.emit(alertMessageString); + break; + + case ABATED: + + LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold); + final TCAAlertsAbatementEntity previousAlertsAbatementEntry = + TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, + tcaAlertsAbatementTable); + + if (previousAlertsAbatementEntry != null) { + + LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry); + + final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS(); + if (abatementSentTS != null) { + LOG.debug("Abatement alert was already sent at timestamp: {}. " + + "Skip resending this abatement alert again", abatementSentTS); + } else { + + final long newAbatementSentTS = new Date().getTime(); + LOG.debug( + "No abatement alert was sent before." + + "Sending abatement alert:{} for the first time at:{}", + alertMessageString, newAbatementSentTS); + + // save new Abatement alert sent timestamp in table + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + Long.toString(newAbatementSentTS), tcaAlertsAbatementTable); + LOG.info("Emitting ABATED alert: {}", alertMessageString); + alertsAbatementOutputEmitter.emit(alertMessageString); + + } + + } else { + LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.", + alertMessageString); + } + + break; + + default: + + final String errorMessage = String.format( + "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + + "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + + } + + + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java index c2da943..90e8fc2 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java @@ -62,7 +62,7 @@ public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet { * * @param alertMessage alert message */ - @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) public void saveAlerts(String alertMessage) { // Saves alert message in alerts table TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable); diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java index 3023c90..b32173b 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java @@ -27,8 +27,7 @@ import co.cask.cdap.api.flow.flowlet.OutputEmitter; import co.cask.cdap.api.flow.flowlet.StreamEvent; import com.google.common.base.Charsets; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; - -import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; /** @@ -54,6 +53,7 @@ public class TCAVESMessageRouterFlowlet extends AbstractFlowlet { @ProcessInput public void routeVESMessage(StreamEvent vesMessageStreamEvent) { final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString(); - vesMessageEmitter.emit(vesMessage, TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); + vesMessageEmitter.emit( + vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); } } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java index b8460dc..a3a35cf 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -20,10 +20,10 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; +import co.cask.cdap.api.annotation.HashPartition; import co.cask.cdap.api.annotation.Output; import co.cask.cdap.api.annotation.ProcessInput; import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.annotation.RoundRobin; import co.cask.cdap.api.dataset.lib.ObjectMappedTable; import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; import co.cask.cdap.api.flow.flowlet.FlowletContext; @@ -32,10 +32,12 @@ import co.cask.cdap.api.metrics.Metrics; import com.fasterxml.jackson.core.JsonProcessingException; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; @@ -54,7 +56,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) - protected OutputEmitter tcaAlertOutputEmitter; + protected OutputEmitter tcaAlertOutputEmitter; protected Metrics metrics; private ObjectMappedTable vesMessageStatusTable; @@ -100,7 +102,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { * @throws JsonProcessingException if alert message cannot be parsed into JSON object */ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) - @RoundRobin + @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY) public void filterVESMessages(String vesMessage) throws JsonProcessingException { TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; @@ -127,7 +129,12 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); // Step 4: Emit message to Alert Sink Flowlet - tcaAlertOutputEmitter.emit(alertMessage); + final ThresholdCalculatorOutput thresholdCalculatorOutput = + new ThresholdCalculatorOutput(processorContext.getMessage(), + TCAUtils.writeValueAsString(processorContext.getTCAPolicy()), + TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()), + alertMessage); + tcaAlertOutputEmitter.emit(thresholdCalculatorOutput); } else { diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java index 78ef877..5d74f01 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java @@ -43,6 +43,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig { protected Integer tcaVESMessageStatusTableTTLSeconds; protected String tcaVESAlertsTableName; protected Integer tcaVESAlertsTableTTLSeconds; + protected String tcaAlertsAbatementTableName; + protected Integer tcaAlertsAbatementTableTTLSeconds; public TCAAppConfig() { @@ -54,6 +56,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig { tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE; tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE; tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE; + tcaAlertsAbatementTableName = CDAPComponentsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE; + tcaAlertsAbatementTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_TTL_TABLE; } public String getTcaSubscriberOutputStreamName() { @@ -80,6 +84,14 @@ public class TCAAppConfig extends CDAPBaseAppConfig { return thresholdCalculatorFlowletInstances; } + public String getTcaAlertsAbatementTableName() { + return tcaAlertsAbatementTableName; + } + + public Integer getTcaAlertsAbatementTableTTLSeconds() { + return tcaAlertsAbatementTableTTLSeconds; + } + @Override public String toString() { return Objects.toStringHelper(this) @@ -91,6 +103,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig { .add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds) .add("tcaVESAlertsTableName", tcaVESAlertsTableName) .add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds) + .add("tcaAlertsAbatementTableName", tcaAlertsAbatementTableName) + .add("tcaAlertsAbatementTableTTLSeconds", tcaAlertsAbatementTableTTLSeconds) .toString(); } } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java index c29b7ce..2f86b37 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java @@ -195,6 +195,55 @@ public class TCAAppPreferences implements CDAPAppPreferences { return enableAlertCEFFormat; } + + public void setSubscriberHostName(String subscriberHostName) { + this.subscriberHostName = subscriberHostName; + } + + public void setSubscriberHostPort(Integer subscriberHostPort) { + this.subscriberHostPort = subscriberHostPort; + } + + public void setSubscriberTopicName(String subscriberTopicName) { + this.subscriberTopicName = subscriberTopicName; + } + + public void setSubscriberProtocol(String subscriberProtocol) { + this.subscriberProtocol = subscriberProtocol; + } + + public void setSubscriberUserName(String subscriberUserName) { + this.subscriberUserName = subscriberUserName; + } + + public void setSubscriberUserPassword(String subscriberUserPassword) { + this.subscriberUserPassword = subscriberUserPassword; + } + + public void setPublisherHostName(String publisherHostName) { + this.publisherHostName = publisherHostName; + } + + public void setPublisherHostPort(Integer publisherHostPort) { + this.publisherHostPort = publisherHostPort; + } + + public void setPublisherTopicName(String publisherTopicName) { + this.publisherTopicName = publisherTopicName; + } + + public void setPublisherProtocol(String publisherProtocol) { + this.publisherProtocol = publisherProtocol; + } + + public void setPublisherUserName(String publisherUserName) { + this.publisherUserName = publisherUserName; + } + + public void setPublisherUserPassword(String publisherUserPassword) { + this.publisherUserPassword = publisherUserPassword; + } + @Override public String toString() { return Objects.toStringHelper(this) diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java index 29d42d5..89c5a84 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java @@ -23,6 +23,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.utils; import co.cask.cdap.api.RuntimeContext; import com.google.common.base.Function; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; @@ -30,12 +31,19 @@ import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator; import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator; import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo; +import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig; +import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn; +import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Collection; import java.util.List; import java.util.Map; @@ -80,12 +88,123 @@ public abstract class CDAPTCAUtils extends TCAUtils { final TCAAppPreferences tcaAppPreferences = ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class); + final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration(); + + // populate DMaaP Information from App Config String + populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences); + // Validate runtime arguments validateSettings(tcaAppPreferences, new TCAPreferencesValidator()); return tcaAppPreferences; } + /** + * Populated App Preferences DMaaP Information from Application Config String + * + * @param appConfigString CDAP Application config String + * @param tcaAppPreferences TCA App Preferences + */ + private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString, + final TCAAppPreferences tcaAppPreferences) { + + if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) { + LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config"); + } + + LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString); + + try { + final TCAControllerAppConfig tcaControllerAppConfig = + readValue(appConfigString, TCAControllerAppConfig.class); + + // Parse Subscriber DMaaP information from App Config String + if (tcaControllerAppConfig.getStreamsSubscribes() != null && + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null && + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) { + + final DMAAPInfo subscriberDmaapInfo = + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo(); + LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl()); + final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl()); + tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol()); + tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost()); + final int subscriberUrlPort = subscriberUrl.getPort() != -1 ? + subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol()); + tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort); + tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8)); + + final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn(); + tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName()); + tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword()); + } else { + LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString); + } + + + // Parse Publisher DMaaP information from App Config String + if (tcaControllerAppConfig.getStreamsPublishes() != null && + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null && + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) { + + final DMAAPInfo publisherDmaapInfo = + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo(); + LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl()); + final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl()); + tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol()); + tcaAppPreferences.setPublisherHostName(publisherUrl.getHost()); + final int publisherUrlPort = publisherUrl.getPort() != -1 ? + publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol()); + tcaAppPreferences.setPublisherHostPort(publisherUrlPort); + tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8)); + + final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut(); + tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName()); + tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword()); + } else { + LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString); + } + + + } catch (IOException e) { + throw new CDAPSettingsException( + "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e); + } + } + + /** + * Parses provided DMaaP MR URL string to {@link URL} object + * + * @param urlString url string + * + * @return url object + */ + private static URL parseURL(final String urlString) { + try { + return new URL(urlString); + } catch (MalformedURLException e) { + final String errorMessage = String.format("Invalid URL format: %s", urlString); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + /** + * Sets up default DMaaP Port if not provided with DMaaP URL + * + * @param protocol protocol e.g. http or https + * + * @return default DMaaP MR port number + */ + private static int getDefaultDMaaPPort(final String protocol) { + if ("http".equals(protocol)) { + return 3904; + } else if ("https".equals(protocol)) { + return 3905; + } else { + return 80; + } + } + /** * Extracts alert message strings from {@link TCAVESAlertEntity} @@ -118,28 +237,32 @@ public abstract class CDAPTCAUtils extends TCAUtils { TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences(); - final String tcaPolicy = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY); + final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY); - if (tcaPolicy != null) { + if (StringUtils.isNotBlank(tcaPolicyJsonString)) { - LOG.debug(" tcaPolicy is being read from JSON String"); + LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}", + AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString); // initialize unquotedTCAPolicy - String unquotedTCAPolicy = tcaPolicy; + String unquotedTCAPolicy = tcaPolicyJsonString.trim(); - //remove starting and ending quote from tcaPolicy - if (tcaPolicy.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && tcaPolicy.trim().endsWith - (AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) { - unquotedTCAPolicy = tcaPolicy.trim().substring(1, tcaPolicy.trim().length() - 1); + //remove starting and ending quote from passed tca policy Json string if present + if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && + tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) { + unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1); } try { tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class); } catch (IOException e) { - throw new CDAPSettingsException("Invalid tca policy format", LOG, e); + throw new CDAPSettingsException( + "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e); } - } else { // old controller is being used. Validate preferences as received from old controller + } else { // classical controller is being used. Validate preferences as received from classical controller + + LOG.info("TcaPolicy is being parsed as key value pair from classical controller"); // extract TCA Policy Domain from Runtime Arguments final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH); @@ -156,8 +279,8 @@ public abstract class CDAPTCAUtils extends TCAUtils { extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); // create metrics per functional role list - tcaPolicyPreferences.setMetricsPerFunctionalRole( - createTCAPolicyMetricsPerFunctionalRoleList(functionalRolesMap)); + tcaPolicyPreferences.setMetricsPerEventName( + createTCAPolicyMetricsPerEventNameList(functionalRolesMap)); } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java index 858204a..ceb1857 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java @@ -24,8 +24,10 @@ import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsV import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; @@ -36,6 +38,7 @@ import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtil /** * Validates TCA Policy Preferences *

+ * * @author Rajiv Singla . Creation Date: 11/29/2016. */ public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator policyFunctionalRoles = TCAUtils.getPolicyFunctionalRoles(tcaPolicyPreferences); - if (policyFunctionalRoles.isEmpty()) { - validationResponse.addErrorMessage("metricsPerFunctionalRoles", - "TCA Policy must have at least one or more functional roles"); + // validate TCA Policy must have at least one event name + final List policyEventNames = TCAUtils.getPolicyEventNames(tcaPolicyPreferences); + if (policyEventNames.isEmpty()) { + validationResponse.addErrorMessage("metricsPerEventNames", + "TCA Policy must have at least one or more event names"); } - final List metricsPerFunctionalRoles = - tcaPolicyPreferences.getMetricsPerFunctionalRole(); + final List metricsPerEventNames = + tcaPolicyPreferences.getMetricsPerEventName(); - // validate each Functional Role must have at least one threshold - for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) { - if (metricsPerFunctionalRole.getThresholds().isEmpty()) { - validationResponse.addErrorMessage("thresholds", - "TCA Policy Functional Role must have at least one threshold. " + - "Functional Role causing this validation error:" + metricsPerFunctionalRole); + // validate Metrics Per Event Name + for (MetricsPerEventName metricsPerEventName : metricsPerEventNames) { + + // event name must be present + final String eventName = metricsPerEventName.getEventName(); + if (isEmpty(eventName)) { + validationResponse.addErrorMessage("eventName", + "TCA Policy eventName is not present for metricsPerEventName:" + metricsPerEventName); } - } - // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity - for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) { - final List functionalRoleThresholds = metricsPerFunctionalRole.getThresholds(); - for (Threshold functionalRoleThreshold : functionalRoleThresholds) { - final String fieldPath = functionalRoleThreshold.getFieldPath(); - final Long thresholdValue = functionalRoleThreshold.getThresholdValue(); - final Direction direction = functionalRoleThreshold.getDirection(); - final EventSeverity severity = functionalRoleThreshold.getSeverity(); - if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null) { - validationResponse.addErrorMessage("threshold", - "TCA Policy threshold must have fieldPath, thresholdValue, direction and severity present." - + "Threshold causing this validation error:" + functionalRoleThreshold); + // control Loop Schema type must be present + final ControlLoopSchemaType controlLoopSchemaType = metricsPerEventName.getControlLoopSchemaType(); + if (controlLoopSchemaType == null) { + validationResponse.addErrorMessage("controlLoopEventType", + "TCA Policy controlLoopSchemaType is not present for metricsPerEventName:" + + metricsPerEventName); + } + + // must have at least 1 threshold defined + if (metricsPerEventName.getThresholds() == null || metricsPerEventName.getThresholds().isEmpty()) { + validationResponse.addErrorMessage("thresholds", + "TCA Policy event Name must have at least one threshold. " + + "Event Name causing this validation error:" + metricsPerEventName); + } else { + // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity + final List eventNameThresholds = metricsPerEventName.getThresholds(); + for (Threshold eventNameThreshold : eventNameThresholds) { + final String fieldPath = eventNameThreshold.getFieldPath(); + final Long thresholdValue = eventNameThreshold.getThresholdValue(); + final Direction direction = eventNameThreshold.getDirection(); + final EventSeverity severity = eventNameThreshold.getSeverity(); + final ControlLoopEventStatus closedLoopEventStatus = eventNameThreshold.getClosedLoopEventStatus(); + if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null || + closedLoopEventStatus == null) { + validationResponse.addErrorMessage("threshold", + "TCA Policy threshold must have fieldPath,thresholdValue,direction, " + + "closedLoopEventStatus and severity defined." + + "Threshold causing this validation error:" + eventNameThreshold); + } } } } - - return validationResponse; } } -- cgit 1.2.3-korg