aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java')
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java15
1 files changed, 11 insertions, 4 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
index b8460dc..a3a35cf 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java
@@ -20,10 +20,10 @@
package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+import co.cask.cdap.api.annotation.HashPartition;
import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.Property;
-import co.cask.cdap.api.annotation.RoundRobin;
import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
@@ -32,10 +32,12 @@ import co.cask.cdap.api.metrics.Metrics;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity;
import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
@@ -54,7 +56,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class);
@Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
- protected OutputEmitter<String> tcaAlertOutputEmitter;
+ protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter;
protected Metrics metrics;
private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable;
@@ -100,7 +102,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
* @throws JsonProcessingException if alert message cannot be parsed into JSON object
*/
@ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
- @RoundRobin
+ @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY)
public void filterVESMessages(String vesMessage) throws JsonProcessingException {
TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
@@ -127,7 +129,12 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
// Step 4: Emit message to Alert Sink Flowlet
- tcaAlertOutputEmitter.emit(alertMessage);
+ final ThresholdCalculatorOutput thresholdCalculatorOutput =
+ new ThresholdCalculatorOutput(processorContext.getMessage(),
+ TCAUtils.writeValueAsString(processorContext.getTCAPolicy()),
+ TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()),
+ alertMessage);
+ tcaAlertOutputEmitter.emit(thresholdCalculatorOutput);
} else {