diff options
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java')
-rw-r--r-- | dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java index b8460dc..a3a35cf 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -20,10 +20,10 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; +import co.cask.cdap.api.annotation.HashPartition; import co.cask.cdap.api.annotation.Output; import co.cask.cdap.api.annotation.ProcessInput; import co.cask.cdap.api.annotation.Property; -import co.cask.cdap.api.annotation.RoundRobin; import co.cask.cdap.api.dataset.lib.ObjectMappedTable; import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; import co.cask.cdap.api.flow.flowlet.FlowletContext; @@ -32,10 +32,12 @@ import co.cask.cdap.api.metrics.Metrics; import com.fasterxml.jackson.core.JsonProcessingException; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; @@ -54,7 +56,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) - protected OutputEmitter<String> tcaAlertOutputEmitter; + protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter; protected Metrics metrics; private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; @@ -100,7 +102,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { * @throws JsonProcessingException if alert message cannot be parsed into JSON object */ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) - @RoundRobin + @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY) public void filterVESMessages(String vesMessage) throws JsonProcessingException { TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; @@ -127,7 +129,12 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); // Step 4: Emit message to Alert Sink Flowlet - tcaAlertOutputEmitter.emit(alertMessage); + final ThresholdCalculatorOutput thresholdCalculatorOutput = + new ThresholdCalculatorOutput(processorContext.getMessage(), + TCAUtils.writeValueAsString(processorContext.getTCAPolicy()), + TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()), + alertMessage); + tcaAlertOutputEmitter.emit(thresholdCalculatorOutput); } else { |