aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap')
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java11
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java11
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java163
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java2
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java6
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java15
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java14
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java49
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java147
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java77
10 files changed, 444 insertions, 51 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java
index 1a8cb5e..8c634a7 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java
@@ -25,6 +25,7 @@ import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
@@ -79,6 +80,16 @@ public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
+
+ // Create TCA Alerts Abatement Table
+ final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName();
+ final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds();
+ LOG.info("Creating Alerts Abatement Table: {} with TTL: {}",
+ tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds);
+ final DatasetProperties alertsAbatementTableProperties =
+ TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds);
+ createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties);
+
// Create TCA VES Alerts Table
final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java
index d880a12..4df35e7 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java
@@ -22,6 +22,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.flow;
import co.cask.cdap.api.flow.AbstractFlow;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet;
import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet;
import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet;
import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet;
@@ -53,6 +54,10 @@ public class TCAVESCollectorFlow extends AbstractFlow {
new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName());
addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances());
+ final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet =
+ new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName());
+ addFlowlet(tcavesAlertsAbatementFlowlet);
+
final TCAVESAlertsSinkFlowlet alertsSinkFlowlet =
new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName());
addFlowlet(alertsSinkFlowlet);
@@ -62,8 +67,10 @@ public class TCAVESCollectorFlow extends AbstractFlow {
connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet);
// connect message router to VES threshold calculator
connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet);
- // connect VES threshold calculator flowlet to Alerts Sink Flowlet
- connect(thresholdViolationCalculatorFlowlet, alertsSinkFlowlet);
+ // connect VES threshold calculator flowlet to Alerts Abatement Flowlet
+ connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet);
+ // connect Alerts Abatement flowlet to Alerts Sink Flowlet
+ connect(tcavesAlertsAbatementFlowlet, alertsSinkFlowlet);
}
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
new file mode 100644
index 0000000..543fc9e
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
@@ -0,0 +1,163 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+/**
+ * Flowlet responsible to sending out abatement alerts
+ *
+ * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017.
+ */
+public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
+
+ @Property
+ private final String tcaAlertsAbatementTableName;
+
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
+ protected OutputEmitter<String> alertsAbatementOutputEmitter;
+
+ private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
+
+ public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
+ this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
+ }
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+ tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+ }
+
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
+ public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws Exception {
+
+ final String cefMessage = thresholdCalculatorOutput.getCefMessage();
+ final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
+ final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
+
+ // alerts must have violated metrics per event name present
+ if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
+ final String errorMessage = String.format(
+ "No violated metricsPerEventName found for VES Message: %s." +
+ "Ignored alert message: %s", cefMessage, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+ }
+
+ final MetricsPerEventName violatedMetricsPerEventName =
+ TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
+ final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
+ final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
+ final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
+ final ControlLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
+
+ switch (closedLoopEventStatus) {
+
+ case ONSET:
+
+ LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ null, tcaAlertsAbatementTable);
+ LOG.info("Emitting ONSET alert: {}", alertMessageString);
+ alertsAbatementOutputEmitter.emit(alertMessageString);
+ break;
+
+ case ABATED:
+
+ LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
+ final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
+ TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
+ tcaAlertsAbatementTable);
+
+ if (previousAlertsAbatementEntry != null) {
+
+ LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
+
+ final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
+ if (abatementSentTS != null) {
+ LOG.debug("Abatement alert was already sent at timestamp: {}. " +
+ "Skip resending this abatement alert again", abatementSentTS);
+ } else {
+
+ final long newAbatementSentTS = new Date().getTime();
+ LOG.debug(
+ "No abatement alert was sent before." +
+ "Sending abatement alert:{} for the first time at:{}",
+ alertMessageString, newAbatementSentTS);
+
+ // save new Abatement alert sent timestamp in table
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+ LOG.info("Emitting ABATED alert: {}", alertMessageString);
+ alertsAbatementOutputEmitter.emit(alertMessageString);
+
+ }
+
+ } else {
+ LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
+ alertMessageString);
+ }
+
+ break;
+
+ default:
+
+ final String errorMessage = String.format(
+ "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
+ "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+
+ }
+
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java
index c2da943..90e8fc2 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java
@@ -62,7 +62,7 @@ public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet {
*
* @param alertMessage alert message
*/
- @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
public void saveAlerts(String alertMessage) {
// Saves alert message in alerts table
TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable);
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java
index 3023c90..b32173b 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java
@@ -27,8 +27,7 @@ import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import com.google.common.base.Charsets;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
-
-import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
/**
@@ -54,6 +53,7 @@ public class TCAVESMessageRouterFlowlet extends AbstractFlowlet {
@ProcessInput
public void routeVESMessage(StreamEvent vesMessageStreamEvent) {
final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString();
- vesMessageEmitter.emit(vesMessage, TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode());
+ vesMessageEmitter.emit(
+ vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode());
}
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
index b8460dc..a3a35cf 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
@@ -20,10 +20,10 @@
package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+import co.cask.cdap.api.annotation.HashPartition;
import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.Property;
-import co.cask.cdap.api.annotation.RoundRobin;
import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
@@ -32,10 +32,12 @@ import co.cask.cdap.api.metrics.Metrics;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity;
import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
@@ -54,7 +56,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class);
@Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
- protected OutputEmitter<String> tcaAlertOutputEmitter;
+ protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter;
protected Metrics metrics;
private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable;
@@ -100,7 +102,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
* @throws JsonProcessingException if alert message cannot be parsed into JSON object
*/
@ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
- @RoundRobin
+ @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY)
public void filterVESMessages(String vesMessage) throws JsonProcessingException {
TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
@@ -127,7 +129,12 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
// Step 4: Emit message to Alert Sink Flowlet
- tcaAlertOutputEmitter.emit(alertMessage);
+ final ThresholdCalculatorOutput thresholdCalculatorOutput =
+ new ThresholdCalculatorOutput(processorContext.getMessage(),
+ TCAUtils.writeValueAsString(processorContext.getTCAPolicy()),
+ TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()),
+ alertMessage);
+ tcaAlertOutputEmitter.emit(thresholdCalculatorOutput);
} else {
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java
index 78ef877..5d74f01 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java
@@ -43,6 +43,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig {
protected Integer tcaVESMessageStatusTableTTLSeconds;
protected String tcaVESAlertsTableName;
protected Integer tcaVESAlertsTableTTLSeconds;
+ protected String tcaAlertsAbatementTableName;
+ protected Integer tcaAlertsAbatementTableTTLSeconds;
public TCAAppConfig() {
@@ -54,6 +56,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig {
tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE;
tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE;
tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE;
+ tcaAlertsAbatementTableName = CDAPComponentsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE;
+ tcaAlertsAbatementTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_TTL_TABLE;
}
public String getTcaSubscriberOutputStreamName() {
@@ -80,6 +84,14 @@ public class TCAAppConfig extends CDAPBaseAppConfig {
return thresholdCalculatorFlowletInstances;
}
+ public String getTcaAlertsAbatementTableName() {
+ return tcaAlertsAbatementTableName;
+ }
+
+ public Integer getTcaAlertsAbatementTableTTLSeconds() {
+ return tcaAlertsAbatementTableTTLSeconds;
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this)
@@ -91,6 +103,8 @@ public class TCAAppConfig extends CDAPBaseAppConfig {
.add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds)
.add("tcaVESAlertsTableName", tcaVESAlertsTableName)
.add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds)
+ .add("tcaAlertsAbatementTableName", tcaAlertsAbatementTableName)
+ .add("tcaAlertsAbatementTableTTLSeconds", tcaAlertsAbatementTableTTLSeconds)
.toString();
}
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
index c29b7ce..2f86b37 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
@@ -195,6 +195,55 @@ public class TCAAppPreferences implements CDAPAppPreferences {
return enableAlertCEFFormat;
}
+
+ public void setSubscriberHostName(String subscriberHostName) {
+ this.subscriberHostName = subscriberHostName;
+ }
+
+ public void setSubscriberHostPort(Integer subscriberHostPort) {
+ this.subscriberHostPort = subscriberHostPort;
+ }
+
+ public void setSubscriberTopicName(String subscriberTopicName) {
+ this.subscriberTopicName = subscriberTopicName;
+ }
+
+ public void setSubscriberProtocol(String subscriberProtocol) {
+ this.subscriberProtocol = subscriberProtocol;
+ }
+
+ public void setSubscriberUserName(String subscriberUserName) {
+ this.subscriberUserName = subscriberUserName;
+ }
+
+ public void setSubscriberUserPassword(String subscriberUserPassword) {
+ this.subscriberUserPassword = subscriberUserPassword;
+ }
+
+ public void setPublisherHostName(String publisherHostName) {
+ this.publisherHostName = publisherHostName;
+ }
+
+ public void setPublisherHostPort(Integer publisherHostPort) {
+ this.publisherHostPort = publisherHostPort;
+ }
+
+ public void setPublisherTopicName(String publisherTopicName) {
+ this.publisherTopicName = publisherTopicName;
+ }
+
+ public void setPublisherProtocol(String publisherProtocol) {
+ this.publisherProtocol = publisherProtocol;
+ }
+
+ public void setPublisherUserName(String publisherUserName) {
+ this.publisherUserName = publisherUserName;
+ }
+
+ public void setPublisherUserPassword(String publisherUserPassword) {
+ this.publisherUserPassword = publisherUserPassword;
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this)
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java
index 29d42d5..89c5a84 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java
@@ -23,6 +23,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.utils;
import co.cask.cdap.api.RuntimeContext;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
@@ -30,12 +31,19 @@ import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator;
import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator;
import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -80,12 +88,123 @@ public abstract class CDAPTCAUtils extends TCAUtils {
final TCAAppPreferences tcaAppPreferences =
ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class);
+ final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration();
+
+ // populate DMaaP Information from App Config String
+ populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences);
+
// Validate runtime arguments
validateSettings(tcaAppPreferences, new TCAPreferencesValidator());
return tcaAppPreferences;
}
+ /**
+ * Populated App Preferences DMaaP Information from Application Config String
+ *
+ * @param appConfigString CDAP Application config String
+ * @param tcaAppPreferences TCA App Preferences
+ */
+ private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,
+ final TCAAppPreferences tcaAppPreferences) {
+
+ if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {
+ LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");
+ }
+
+ LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);
+
+ try {
+ final TCAControllerAppConfig tcaControllerAppConfig =
+ readValue(appConfigString, TCAControllerAppConfig.class);
+
+ // Parse Subscriber DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsSubscribes() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {
+
+ final DMAAPInfo subscriberDmaapInfo =
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();
+ LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());
+ final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());
+ final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?
+ subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);
+ tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));
+
+ final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();
+ tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());
+ tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ // Parse Publisher DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsPublishes() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {
+
+ final DMAAPInfo publisherDmaapInfo =
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();
+ LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());
+ final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());
+ final int publisherUrlPort = publisherUrl.getPort() != -1 ?
+ publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostPort(publisherUrlPort);
+ tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));
+
+ final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();
+ tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());
+ tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);
+ }
+ }
+
+ /**
+ * Parses provided DMaaP MR URL string to {@link URL} object
+ *
+ * @param urlString url string
+ *
+ * @return url object
+ */
+ private static URL parseURL(final String urlString) {
+ try {
+ return new URL(urlString);
+ } catch (MalformedURLException e) {
+ final String errorMessage = String.format("Invalid URL format: %s", urlString);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ /**
+ * Sets up default DMaaP Port if not provided with DMaaP URL
+ *
+ * @param protocol protocol e.g. http or https
+ *
+ * @return default DMaaP MR port number
+ */
+ private static int getDefaultDMaaPPort(final String protocol) {
+ if ("http".equals(protocol)) {
+ return 3904;
+ } else if ("https".equals(protocol)) {
+ return 3905;
+ } else {
+ return 80;
+ }
+ }
+
/**
* Extracts alert message strings from {@link TCAVESAlertEntity}
@@ -118,28 +237,32 @@ public abstract class CDAPTCAUtils extends TCAUtils {
TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences();
- final String tcaPolicy = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
+ final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
- if (tcaPolicy != null) {
+ if (StringUtils.isNotBlank(tcaPolicyJsonString)) {
- LOG.debug(" tcaPolicy is being read from JSON String");
+ LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}",
+ AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString);
// initialize unquotedTCAPolicy
- String unquotedTCAPolicy = tcaPolicy;
+ String unquotedTCAPolicy = tcaPolicyJsonString.trim();
- //remove starting and ending quote from tcaPolicy
- if (tcaPolicy.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && tcaPolicy.trim().endsWith
- (AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
- unquotedTCAPolicy = tcaPolicy.trim().substring(1, tcaPolicy.trim().length() - 1);
+ //remove starting and ending quote from passed tca policy Json string if present
+ if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) &&
+ tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
+ unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1);
}
try {
tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class);
} catch (IOException e) {
- throw new CDAPSettingsException("Invalid tca policy format", LOG, e);
+ throw new CDAPSettingsException(
+ "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e);
}
- } else { // old controller is being used. Validate preferences as received from old controller
+ } else { // classical controller is being used. Validate preferences as received from classical controller
+
+ LOG.info("TcaPolicy is being parsed as key value pair from classical controller");
// extract TCA Policy Domain from Runtime Arguments
final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH);
@@ -156,8 +279,8 @@ public abstract class CDAPTCAUtils extends TCAUtils {
extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER);
// create metrics per functional role list
- tcaPolicyPreferences.setMetricsPerFunctionalRole(
- createTCAPolicyMetricsPerFunctionalRoleList(functionalRolesMap));
+ tcaPolicyPreferences.setMetricsPerEventName(
+ createTCAPolicyMetricsPerEventNameList(functionalRolesMap));
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java
index 858204a..ceb1857 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java
@@ -24,8 +24,10 @@ import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsV
import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse;
import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
@@ -36,6 +38,7 @@ import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtil
/**
* Validates TCA Policy Preferences
* <p>
+ *
* @author Rajiv Singla . Creation Date: 11/29/2016.
*/
public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<TCAPolicyPreferences,
@@ -55,42 +58,58 @@ public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<T
validationResponse.addErrorMessage("domain", "TCA Policy must have only one domain present");
}
- // validate TCA Policy must have at least one functional role
- final List<String> policyFunctionalRoles = TCAUtils.getPolicyFunctionalRoles(tcaPolicyPreferences);
- if (policyFunctionalRoles.isEmpty()) {
- validationResponse.addErrorMessage("metricsPerFunctionalRoles",
- "TCA Policy must have at least one or more functional roles");
+ // validate TCA Policy must have at least one event name
+ final List<String> policyEventNames = TCAUtils.getPolicyEventNames(tcaPolicyPreferences);
+ if (policyEventNames.isEmpty()) {
+ validationResponse.addErrorMessage("metricsPerEventNames",
+ "TCA Policy must have at least one or more event names");
}
- final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles =
- tcaPolicyPreferences.getMetricsPerFunctionalRole();
+ final List<MetricsPerEventName> metricsPerEventNames =
+ tcaPolicyPreferences.getMetricsPerEventName();
- // validate each Functional Role must have at least one threshold
- for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) {
- if (metricsPerFunctionalRole.getThresholds().isEmpty()) {
- validationResponse.addErrorMessage("thresholds",
- "TCA Policy Functional Role must have at least one threshold. " +
- "Functional Role causing this validation error:" + metricsPerFunctionalRole);
+ // validate Metrics Per Event Name
+ for (MetricsPerEventName metricsPerEventName : metricsPerEventNames) {
+
+ // event name must be present
+ final String eventName = metricsPerEventName.getEventName();
+ if (isEmpty(eventName)) {
+ validationResponse.addErrorMessage("eventName",
+ "TCA Policy eventName is not present for metricsPerEventName:" + metricsPerEventName);
}
- }
- // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity
- for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) {
- final List<Threshold> functionalRoleThresholds = metricsPerFunctionalRole.getThresholds();
- for (Threshold functionalRoleThreshold : functionalRoleThresholds) {
- final String fieldPath = functionalRoleThreshold.getFieldPath();
- final Long thresholdValue = functionalRoleThreshold.getThresholdValue();
- final Direction direction = functionalRoleThreshold.getDirection();
- final EventSeverity severity = functionalRoleThreshold.getSeverity();
- if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null) {
- validationResponse.addErrorMessage("threshold",
- "TCA Policy threshold must have fieldPath, thresholdValue, direction and severity present."
- + "Threshold causing this validation error:" + functionalRoleThreshold);
+ // control Loop Schema type must be present
+ final ControlLoopSchemaType controlLoopSchemaType = metricsPerEventName.getControlLoopSchemaType();
+ if (controlLoopSchemaType == null) {
+ validationResponse.addErrorMessage("controlLoopEventType",
+ "TCA Policy controlLoopSchemaType is not present for metricsPerEventName:"
+ + metricsPerEventName);
+ }
+
+ // must have at least 1 threshold defined
+ if (metricsPerEventName.getThresholds() == null || metricsPerEventName.getThresholds().isEmpty()) {
+ validationResponse.addErrorMessage("thresholds",
+ "TCA Policy event Name must have at least one threshold. " +
+ "Event Name causing this validation error:" + metricsPerEventName);
+ } else {
+ // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity
+ final List<Threshold> eventNameThresholds = metricsPerEventName.getThresholds();
+ for (Threshold eventNameThreshold : eventNameThresholds) {
+ final String fieldPath = eventNameThreshold.getFieldPath();
+ final Long thresholdValue = eventNameThreshold.getThresholdValue();
+ final Direction direction = eventNameThreshold.getDirection();
+ final EventSeverity severity = eventNameThreshold.getSeverity();
+ final ControlLoopEventStatus closedLoopEventStatus = eventNameThreshold.getClosedLoopEventStatus();
+ if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null ||
+ closedLoopEventStatus == null) {
+ validationResponse.addErrorMessage("threshold",
+ "TCA Policy threshold must have fieldPath,thresholdValue,direction, " +
+ "closedLoopEventStatus and severity defined." +
+ "Threshold causing this validation error:" + eventNameThreshold);
+ }
}
}
}
-
-
return validationResponse;
}
}