diff options
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet')
5 files changed, 583 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java new file mode 100644 index 0000000..0a557e1 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java @@ -0,0 +1,128 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import org.onap.dcae.apod.analytics.aai.AAIClientFactory; +import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig; +import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Flowlet responsible for doing A&AI Enrichment + * + * @author Rajiv Singla . Creation Date: 9/20/2017. + */ +public class TCAVESAAIEnrichmentFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAAIEnrichmentFlowlet.class); + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT) + protected OutputEmitter<String> aaiEnrichmentOutputEmitter; + + private TCAAppPreferences tcaAppPreferences; + private AAIEnrichmentClient aaiEnrichmentClient; + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + if (tcaAppPreferences.getEnableAAIEnrichment()) { + final AAIHttpClientConfig aaiHttpClientConfig = + CDAPTCAUtils.createAAIEnrichmentClientConfig(tcaAppPreferences); + aaiEnrichmentClient = AAIClientFactory.create().getEnrichmentClient(aaiHttpClientConfig); + } + } + + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) + public void performAAIEnrichment(final String alertMessageString) throws IOException { + + // if A&AI enrichment is disabled - no A&AI lookups are required + if (!tcaAppPreferences.getEnableAAIEnrichment()) { + + LOG.debug("A&AI Enrichment is disabled. Skip A&AI Enrichment for alert: {}", alertMessageString); + aaiEnrichmentOutputEmitter.emit(alertMessageString); + + } else { + + // determine closed Loop Event Status + final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); + final ClosedLoopEventStatus closedLoopEventStatus = + ClosedLoopEventStatus.valueOf(tcavesResponse.getClosedLoopEventStatus()); + + if (closedLoopEventStatus == ClosedLoopEventStatus.ONSET) { + LOG.debug("Performing A&AI Enrichment of ONSET Alert: {}", alertMessageString); + final ControlLoopSchemaType controlLoopSchemaType = + TCAUtils.determineControlLoopSchemaType(tcavesResponse); + final String sourceName = TCAUtils.determineSourceName(tcavesResponse); + LOG.debug("A&AI Source Name: {}, Control Loop Schema Type: {} for ONSET Alert: {}", + sourceName, controlLoopSchemaType, alertMessageString); + + if (controlLoopSchemaType == ControlLoopSchemaType.VM) { + final String aaiVMEnrichmentAPIPath = tcaAppPreferences.getAaiVMEnrichmentAPIPath(); + TCAUtils.doAAIVMEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVMEnrichmentAPIPath, + alertMessageString, sourceName); + } else { + final String aaiVNFEnrichmentAPIPath = tcaAppPreferences.getAaiVNFEnrichmentAPIPath(); + TCAUtils.doAAIVNFEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVNFEnrichmentAPIPath, + alertMessageString, sourceName); + } + + final String aaiEnrichedAlert = TCAUtils.writeValueAsString(tcavesResponse); + LOG.debug("Emitting Alert after A&AI Enrichment: {}", aaiEnrichedAlert); + aaiEnrichmentOutputEmitter.emit(aaiEnrichedAlert); + + // skip A&AI Enrichment of alerts with closed Loop Event Status - ABATED + } else if (closedLoopEventStatus == ClosedLoopEventStatus.ABATED) { + LOG.debug("Skipping Enrichment of Abated Alert: {}", alertMessageString); + aaiEnrichmentOutputEmitter.emit(alertMessageString); + + } else { + // unsupported closed loop event status + final String errorMessage = String.format( + "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + + "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + } + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java new file mode 100644 index 0000000..759c3d5 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java @@ -0,0 +1,169 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import org.apache.commons.lang3.StringUtils; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; + +/** + * Flowlet responsible to sending out abatement alerts + * + * @author Rajiv Singla . Creation Date: 9/11/2017. + */ +public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class); + + @Property + private final String tcaAlertsAbatementTableName; + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) + protected OutputEmitter<String> alertsAbatementOutputEmitter; + + private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable; + + public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) { + this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName); + } + + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException { + + final String cefMessage = thresholdCalculatorOutput.getCefMessage(); + final String alertMessageString = thresholdCalculatorOutput.getAlertMessage(); + final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName(); + + // alerts must have violated metrics per event name present + if (StringUtils.isBlank(violatedMetricsPerEventNameString)) { + final String errorMessage = String.format( + "No violated metricsPerEventName found for VES Message: %s." + + "Ignored alert message: %s", cefMessage, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + + final MetricsPerEventName violatedMetricsPerEventName = + TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class); + final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class); + final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); + final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); + final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus(); + + switch (closedLoopEventStatus) { + + case ONSET: + + LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage); + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + null, tcaAlertsAbatementTable); + LOG.debug("Emitting ONSET alert: {}", alertMessageString); + alertsAbatementOutputEmitter.emit(alertMessageString); + break; + + case ABATED: + + LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold); + final TCAAlertsAbatementEntity previousAlertsAbatementEntry = + TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, + tcaAlertsAbatementTable); + + if (previousAlertsAbatementEntry != null) { + + LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry); + + final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS(); + if (abatementSentTS != null) { + LOG.debug("Abatement alert was already sent at timestamp: {}. " + + "Skip resending this abatement alert again", abatementSentTS); + } else { + + final long newAbatementSentTS = new Date().getTime(); + LOG.debug( + "No abatement alert was sent before." + + "Sending abatement alert:{} for the first time at:{}", + alertMessageString, newAbatementSentTS); + + // save new Abatement alert sent timestamp in table + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + Long.toString(newAbatementSentTS), tcaAlertsAbatementTable); + + // Set request id to be same as previous ONSET event request ID + tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId()); + final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse); + + LOG.info("Emitting ABATED alert: {}", abatedAlertString); + alertsAbatementOutputEmitter.emit(abatedAlertString); + + } + + } else { + LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.", + alertMessageString); + } + + break; + + default: + + final String errorMessage = String.format( + "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + + "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + + } + + + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java new file mode 100644 index 0000000..7df0d49 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java @@ -0,0 +1,71 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; + +/** + * Saves TCA VES Alert Messages in a Time series Table + * + * @author Rajiv Singla . Creation Date: 11/15/2016. + */ +public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet { + + @Property + private final String tcaVESAlertsTableName; + + private ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertsTable; + + public TCAVESAlertsSinkFlowlet(String tcaVESAlertsTableName) { + this.tcaVESAlertsTableName = tcaVESAlertsTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaVESAlertsTable = getContext().getDataset(tcaVESAlertsTableName); + } + + /** + * Saves messages to Alerts table + * + * @param alertMessage alert message + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT) + public void saveAlerts(String alertMessage) { + // Saves alert message in alerts table + TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable); + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java new file mode 100644 index 0000000..776a7e0 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java @@ -0,0 +1,59 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.flow.flowlet.StreamEvent; +import com.google.common.base.Charsets; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + + +/** + * TCA Message Router Flowlet emits VES Message to {@link TCAVESThresholdViolationCalculatorFlowlet} instances + * + * @author Rajiv Singla . Creation Date: 11/14/2016. + */ +public class TCAVESMessageRouterFlowlet extends AbstractFlowlet { + + /** + * Emits ves message to TCA Calculator Instances + */ + @Output(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + protected OutputEmitter<String> vesMessageEmitter; + + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET); + } + + @ProcessInput + public void routeVESMessage(StreamEvent vesMessageStreamEvent) { + final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString(); + vesMessageEmitter.emit( + vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java new file mode 100644 index 0000000..b639ff7 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -0,0 +1,156 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.HashPartition; +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.metrics.Metrics; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister.persist; + +/** + * TCA VES Message Filter filters out messages which are not applicable for TCA as per TCA Policy + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter; + protected Metrics metrics; + private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; + + @Property + private final String messageStatusTableName; + private Boolean enableAlertCEFFormat; + + private TCAPolicy tcaPolicy; + + /** + * Creates an instance of TCA VES Threshold violation calculator flowlet with give message status table name + * + * @param messageStatusTableName message status table name + */ + public TCAVESThresholdViolationCalculatorFlowlet(String messageStatusTableName) { + this.messageStatusTableName = messageStatusTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET); + } + + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + + // parse Runtime Arguments to tca policy preferences + tcaPolicy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext); + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + enableAlertCEFFormat = tcaAppPreferences.getEnableAlertCEFFormat(); + vesMessageStatusTable = getContext().getDataset(messageStatusTableName); + + } + + /** + * Filters VES Messages that violates TCA Policy + * + * @param vesMessage VES Message + * @throws JsonProcessingException if alert message cannot be parsed into JSON object + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY) + public void filterVESMessages(String vesMessage) throws JsonProcessingException { + + TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; + String alertMessage = null; + + // Step 1: Filter incoming messages + final TCACEFProcessorContext processorContext = TCAUtils.filterCEFMessage(vesMessage, tcaPolicy); + + if (processorContext.canProcessingContinue()) { + + // Step 2: Check if CEF Message violate any thresholds + final TCACEFProcessorContext processorContextWithViolations = + TCAUtils.computeThresholdViolations(processorContext); + + if (processorContextWithViolations.canProcessingContinue()) { + + // Step 3: Create Alert Message + final String tcaAppName = getContext().getApplicationSpecification().getName(); + alertMessage = + TCAUtils.createTCAAlertString(processorContextWithViolations, tcaAppName, enableAlertCEFFormat); + calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; + LOG.debug("VES Threshold Violation Detected. An alert message is be generated. {}", alertMessage); + + metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); + + // Step 4: Emit message to Alert Sink Flowlet + final ThresholdCalculatorOutput thresholdCalculatorOutput = + new ThresholdCalculatorOutput(processorContext.getMessage(), + TCAUtils.writeValueAsString(processorContext.getTCAPolicy()), + TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()), + alertMessage); + tcaAlertOutputEmitter.emit(thresholdCalculatorOutput); + + } else { + + calculatorMessageType = TCACalculatorMessageType.COMPLIANT; + metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); + } + + } else { + + metrics.count(CDAPMetricsConstants.TCA_VES_INAPPLICABLE_MESSAGES_METRIC, 1); + } + + // save message to message status table + final int instanceId = getContext().getInstanceId(); + persist(processorContext, instanceId, calculatorMessageType, vesMessageStatusTable, alertMessage); + } + + +} |