diff options
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/onap/dcae')
22 files changed, 2909 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java new file mode 100644 index 0000000..19d284a --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java @@ -0,0 +1,116 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca; + +import co.cask.cdap.api.app.AbstractApplication; +import co.cask.cdap.api.data.stream.Stream; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; +import org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.onap.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; +import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator; +import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker; +import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker; +import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> { + + private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class); + + @Override + @SuppressWarnings("unchecked") + public void configure() { + + + // ========= Application configuration Setup ============== // + final TCAAppConfig tcaAppConfig = getConfig(); + + LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig); + + // Validate application configuration + ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator()); + + // App Setup + setName(tcaAppConfig.getAppName()); + setDescription(tcaAppConfig.getAppDescription()); + + // ========== Streams Setup ============== // + // Create DMaaP MR Subscriber CDAP output stream + final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName(); + LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName); + final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName, + CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM); + addStream(subscriberOutputStream); + + + // ============ Datasets Setup ======== // + // Create TCA Message Status Table + final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName(); + final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds(); + LOG.info("Creating TCA Message Status Table: {} with TTL: {}", + tcaVESMessageStatusTableName, messageStatusTableTTLSeconds); + final DatasetProperties messageStatusTableProperties = + TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds); + createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties); + + + // Create TCA Alerts Abatement Table + final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName(); + final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds(); + LOG.info("Creating Alerts Abatement Table: {} with TTL: {}", + tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds); + final DatasetProperties alertsAbatementTableProperties = + TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds); + createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties); + + // Create TCA VES Alerts Table + final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName(); + final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds(); + LOG.info("Creating TCA Alerts Table: {} with TTL: {}", + tcaVESAlertsTableName, alertsTableTTLSeconds); + final DatasetProperties alertTableProperties = + TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds); + createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties); + + // =========== Flow Setup ============= // + addFlow(new TCAVESCollectorFlow(tcaAppConfig)); + + // ========== Workers Setup =========== // + LOG.info("Creating TCA DMaaP Subscriber Worker"); + addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); + LOG.info("Creating TCA DMaaP Publisher Worker"); + addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName())); + // TODO: Remove this before going to production + addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java new file mode 100644 index 0000000..adefbe0 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java @@ -0,0 +1,82 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flow; + +import co.cask.cdap.api.flow.AbstractFlow; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAAIEnrichmentFlowlet; +import org.onap.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet; +import org.onap.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet; +import org.onap.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet; +import org.onap.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; + +/** + * TCA Flow for VES (Virtual Event Streaming) Collector Flow + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAVESCollectorFlow extends AbstractFlow { + + private final TCAAppConfig tcaAppConfig; + + public TCAVESCollectorFlow(TCAAppConfig tcaAppConfig) { + this.tcaAppConfig = tcaAppConfig; + } + + @Override + protected void configure() { + + setName(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_NAME_FLOW); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_DESCRIPTION_FLOW); + + final TCAVESMessageRouterFlowlet messageRouterFlowlet = new TCAVESMessageRouterFlowlet(); + addFlowlet(messageRouterFlowlet); + + final TCAVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = + new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName()); + addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances()); + + final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet = + new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName()); + addFlowlet(tcavesAlertsAbatementFlowlet); + + final TCAVESAAIEnrichmentFlowlet tcavesaaiEnrichmentFlowlet = new TCAVESAAIEnrichmentFlowlet(); + addFlowlet(tcavesaaiEnrichmentFlowlet); + + final TCAVESAlertsSinkFlowlet alertsSinkFlowlet = + new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName()); + addFlowlet(alertsSinkFlowlet); + + + // connect DMaaP MR VES Subscriber output stream to VES Message Router Flowlet + connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet); + // connect message router to VES threshold calculator + connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet); + // connect VES threshold calculator flowlet to Alerts Abatement Flowlet + connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet); + // connect Alerts Abatement flowlet to AAI Enrichment Flowlet + connect(tcavesAlertsAbatementFlowlet, tcavesaaiEnrichmentFlowlet); + // connect A&AI Enrichment flowlet to Alerts Sink Flowlet + connect(tcavesaaiEnrichmentFlowlet, alertsSinkFlowlet); + + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java new file mode 100644 index 0000000..0a557e1 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAAIEnrichmentFlowlet.java @@ -0,0 +1,128 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import org.onap.dcae.apod.analytics.aai.AAIClientFactory; +import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig; +import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Flowlet responsible for doing A&AI Enrichment + * + * @author Rajiv Singla . Creation Date: 9/20/2017. + */ +public class TCAVESAAIEnrichmentFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAAIEnrichmentFlowlet.class); + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT) + protected OutputEmitter<String> aaiEnrichmentOutputEmitter; + + private TCAAppPreferences tcaAppPreferences; + private AAIEnrichmentClient aaiEnrichmentClient; + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + if (tcaAppPreferences.getEnableAAIEnrichment()) { + final AAIHttpClientConfig aaiHttpClientConfig = + CDAPTCAUtils.createAAIEnrichmentClientConfig(tcaAppPreferences); + aaiEnrichmentClient = AAIClientFactory.create().getEnrichmentClient(aaiHttpClientConfig); + } + } + + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) + public void performAAIEnrichment(final String alertMessageString) throws IOException { + + // if A&AI enrichment is disabled - no A&AI lookups are required + if (!tcaAppPreferences.getEnableAAIEnrichment()) { + + LOG.debug("A&AI Enrichment is disabled. Skip A&AI Enrichment for alert: {}", alertMessageString); + aaiEnrichmentOutputEmitter.emit(alertMessageString); + + } else { + + // determine closed Loop Event Status + final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); + final ClosedLoopEventStatus closedLoopEventStatus = + ClosedLoopEventStatus.valueOf(tcavesResponse.getClosedLoopEventStatus()); + + if (closedLoopEventStatus == ClosedLoopEventStatus.ONSET) { + LOG.debug("Performing A&AI Enrichment of ONSET Alert: {}", alertMessageString); + final ControlLoopSchemaType controlLoopSchemaType = + TCAUtils.determineControlLoopSchemaType(tcavesResponse); + final String sourceName = TCAUtils.determineSourceName(tcavesResponse); + LOG.debug("A&AI Source Name: {}, Control Loop Schema Type: {} for ONSET Alert: {}", + sourceName, controlLoopSchemaType, alertMessageString); + + if (controlLoopSchemaType == ControlLoopSchemaType.VM) { + final String aaiVMEnrichmentAPIPath = tcaAppPreferences.getAaiVMEnrichmentAPIPath(); + TCAUtils.doAAIVMEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVMEnrichmentAPIPath, + alertMessageString, sourceName); + } else { + final String aaiVNFEnrichmentAPIPath = tcaAppPreferences.getAaiVNFEnrichmentAPIPath(); + TCAUtils.doAAIVNFEnrichment(tcavesResponse, aaiEnrichmentClient, aaiVNFEnrichmentAPIPath, + alertMessageString, sourceName); + } + + final String aaiEnrichedAlert = TCAUtils.writeValueAsString(tcavesResponse); + LOG.debug("Emitting Alert after A&AI Enrichment: {}", aaiEnrichedAlert); + aaiEnrichmentOutputEmitter.emit(aaiEnrichedAlert); + + // skip A&AI Enrichment of alerts with closed Loop Event Status - ABATED + } else if (closedLoopEventStatus == ClosedLoopEventStatus.ABATED) { + LOG.debug("Skipping Enrichment of Abated Alert: {}", alertMessageString); + aaiEnrichmentOutputEmitter.emit(alertMessageString); + + } else { + // unsupported closed loop event status + final String errorMessage = String.format( + "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + + "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + } + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java new file mode 100644 index 0000000..759c3d5 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java @@ -0,0 +1,169 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import org.apache.commons.lang3.StringUtils; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; + +/** + * Flowlet responsible to sending out abatement alerts + * + * @author Rajiv Singla . Creation Date: 9/11/2017. + */ +public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class); + + @Property + private final String tcaAlertsAbatementTableName; + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) + protected OutputEmitter<String> alertsAbatementOutputEmitter; + + private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable; + + public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) { + this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName); + } + + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException { + + final String cefMessage = thresholdCalculatorOutput.getCefMessage(); + final String alertMessageString = thresholdCalculatorOutput.getAlertMessage(); + final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName(); + + // alerts must have violated metrics per event name present + if (StringUtils.isBlank(violatedMetricsPerEventNameString)) { + final String errorMessage = String.format( + "No violated metricsPerEventName found for VES Message: %s." + + "Ignored alert message: %s", cefMessage, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + + final MetricsPerEventName violatedMetricsPerEventName = + TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class); + final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class); + final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class); + final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); + final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus(); + + switch (closedLoopEventStatus) { + + case ONSET: + + LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage); + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + null, tcaAlertsAbatementTable); + LOG.debug("Emitting ONSET alert: {}", alertMessageString); + alertsAbatementOutputEmitter.emit(alertMessageString); + break; + + case ABATED: + + LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold); + final TCAAlertsAbatementEntity previousAlertsAbatementEntry = + TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, + tcaAlertsAbatementTable); + + if (previousAlertsAbatementEntry != null) { + + LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry); + + final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS(); + if (abatementSentTS != null) { + LOG.debug("Abatement alert was already sent at timestamp: {}. " + + "Skip resending this abatement alert again", abatementSentTS); + } else { + + final long newAbatementSentTS = new Date().getTime(); + LOG.debug( + "No abatement alert was sent before." + + "Sending abatement alert:{} for the first time at:{}", + alertMessageString, newAbatementSentTS); + + // save new Abatement alert sent timestamp in table + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + Long.toString(newAbatementSentTS), tcaAlertsAbatementTable); + + // Set request id to be same as previous ONSET event request ID + tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId()); + final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse); + + LOG.info("Emitting ABATED alert: {}", abatedAlertString); + alertsAbatementOutputEmitter.emit(abatedAlertString); + + } + + } else { + LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.", + alertMessageString); + } + + break; + + default: + + final String errorMessage = String.format( + "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." + + "Ignoring alert: %s", closedLoopEventStatus, alertMessageString); + throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage)); + + } + + + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java new file mode 100644 index 0000000..7df0d49 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java @@ -0,0 +1,71 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; + +/** + * Saves TCA VES Alert Messages in a Time series Table + * + * @author Rajiv Singla . Creation Date: 11/15/2016. + */ +public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet { + + @Property + private final String tcaVESAlertsTableName; + + private ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertsTable; + + public TCAVESAlertsSinkFlowlet(String tcaVESAlertsTableName) { + this.tcaVESAlertsTableName = tcaVESAlertsTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaVESAlertsTable = getContext().getDataset(tcaVESAlertsTableName); + } + + /** + * Saves messages to Alerts table + * + * @param alertMessage alert message + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_AAI_ENRICHMENT_NAME_OUTPUT) + public void saveAlerts(String alertMessage) { + // Saves alert message in alerts table + TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable); + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java new file mode 100644 index 0000000..776a7e0 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java @@ -0,0 +1,59 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.flow.flowlet.StreamEvent; +import com.google.common.base.Charsets; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + + +/** + * TCA Message Router Flowlet emits VES Message to {@link TCAVESThresholdViolationCalculatorFlowlet} instances + * + * @author Rajiv Singla . Creation Date: 11/14/2016. + */ +public class TCAVESMessageRouterFlowlet extends AbstractFlowlet { + + /** + * Emits ves message to TCA Calculator Instances + */ + @Output(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + protected OutputEmitter<String> vesMessageEmitter; + + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET); + } + + @ProcessInput + public void routeVESMessage(StreamEvent vesMessageStreamEvent) { + final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString(); + vesMessageEmitter.emit( + vesMessage, AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java new file mode 100644 index 0000000..b639ff7 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -0,0 +1,156 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.HashPartition; +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.metrics.Metrics; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister.persist; + +/** + * TCA VES Message Filter filters out messages which are not applicable for TCA as per TCA Policy + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter; + protected Metrics metrics; + private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; + + @Property + private final String messageStatusTableName; + private Boolean enableAlertCEFFormat; + + private TCAPolicy tcaPolicy; + + /** + * Creates an instance of TCA VES Threshold violation calculator flowlet with give message status table name + * + * @param messageStatusTableName message status table name + */ + public TCAVESThresholdViolationCalculatorFlowlet(String messageStatusTableName) { + this.messageStatusTableName = messageStatusTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET); + } + + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + + // parse Runtime Arguments to tca policy preferences + tcaPolicy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext); + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + enableAlertCEFFormat = tcaAppPreferences.getEnableAlertCEFFormat(); + vesMessageStatusTable = getContext().getDataset(messageStatusTableName); + + } + + /** + * Filters VES Messages that violates TCA Policy + * + * @param vesMessage VES Message + * @throws JsonProcessingException if alert message cannot be parsed into JSON object + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY) + public void filterVESMessages(String vesMessage) throws JsonProcessingException { + + TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; + String alertMessage = null; + + // Step 1: Filter incoming messages + final TCACEFProcessorContext processorContext = TCAUtils.filterCEFMessage(vesMessage, tcaPolicy); + + if (processorContext.canProcessingContinue()) { + + // Step 2: Check if CEF Message violate any thresholds + final TCACEFProcessorContext processorContextWithViolations = + TCAUtils.computeThresholdViolations(processorContext); + + if (processorContextWithViolations.canProcessingContinue()) { + + // Step 3: Create Alert Message + final String tcaAppName = getContext().getApplicationSpecification().getName(); + alertMessage = + TCAUtils.createTCAAlertString(processorContextWithViolations, tcaAppName, enableAlertCEFFormat); + calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; + LOG.debug("VES Threshold Violation Detected. An alert message is be generated. {}", alertMessage); + + metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); + + // Step 4: Emit message to Alert Sink Flowlet + final ThresholdCalculatorOutput thresholdCalculatorOutput = + new ThresholdCalculatorOutput(processorContext.getMessage(), + TCAUtils.writeValueAsString(processorContext.getTCAPolicy()), + TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()), + alertMessage); + tcaAlertOutputEmitter.emit(thresholdCalculatorOutput); + + } else { + + calculatorMessageType = TCACalculatorMessageType.COMPLIANT; + metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); + } + + } else { + + metrics.count(CDAPMetricsConstants.TCA_VES_INAPPLICABLE_MESSAGES_METRIC, 1); + } + + // save message to message status table + final int instanceId = getContext().getInstanceId(); + persist(processorContext, instanceId, calculatorMessageType, vesMessageStatusTable, alertMessage); + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java new file mode 100644 index 0000000..21145b0 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java @@ -0,0 +1,110 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.settings; + +import com.google.common.base.Objects; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.settings.CDAPBaseAppConfig; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + + +/** + * Contains CDAP App Config Settings for TCA Application + * + * @author Rajiv Singla . Creation Date: 11/2/2016. + */ +public class TCAAppConfig extends CDAPBaseAppConfig { + + + private static final long serialVersionUID = 1L; + + protected String tcaSubscriberOutputStreamName; + protected Integer thresholdCalculatorFlowletInstances; + + protected String tcaVESMessageStatusTableName; + protected Integer tcaVESMessageStatusTableTTLSeconds; + protected String tcaVESAlertsTableName; + protected Integer tcaVESAlertsTableTTLSeconds; + protected String tcaAlertsAbatementTableName; + protected Integer tcaAlertsAbatementTableTTLSeconds; + + + public TCAAppConfig() { + appName = CDAPComponentsConstants.TCA_DEFAULT_NAME_APP; + appDescription = CDAPComponentsConstants.TCA_DEFAULT_DESCRIPTION_APP; + tcaSubscriberOutputStreamName = CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM; + thresholdCalculatorFlowletInstances = AnalyticsConstants.TCA_DEFAULT_THRESHOLD_CALCULATOR_FLOWLET_INSTANCES; + tcaVESMessageStatusTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_NAME_TABLE; + tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE; + tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE; + tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE; + tcaAlertsAbatementTableName = CDAPComponentsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE; + tcaAlertsAbatementTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_ALERTS_ABATEMENT_TTL_TABLE; + } + + public String getTcaSubscriberOutputStreamName() { + return tcaSubscriberOutputStreamName; + } + + public String getTcaVESMessageStatusTableName() { + return tcaVESMessageStatusTableName; + } + + public Integer getTcaVESMessageStatusTableTTLSeconds() { + return tcaVESMessageStatusTableTTLSeconds; + } + + public String getTcaVESAlertsTableName() { + return tcaVESAlertsTableName; + } + + public Integer getTcaVESAlertsTableTTLSeconds() { + return tcaVESAlertsTableTTLSeconds; + } + + public Integer getThresholdCalculatorFlowletInstances() { + return thresholdCalculatorFlowletInstances; + } + + public String getTcaAlertsAbatementTableName() { + return tcaAlertsAbatementTableName; + } + + public Integer getTcaAlertsAbatementTableTTLSeconds() { + return tcaAlertsAbatementTableTTLSeconds; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appName", appName) + .add("appDescription", appDescription) + .add("tcaSubscriberOutputStreamName", tcaSubscriberOutputStreamName) + .add("thresholdCalculatorFlowletInstances", thresholdCalculatorFlowletInstances) + .add("tcaVESMessageStatusTableName", tcaVESMessageStatusTableName) + .add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds) + .add("tcaVESAlertsTableName", tcaVESAlertsTableName) + .add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds) + .add("tcaAlertsAbatementTableName", tcaAlertsAbatementTableName) + .add("tcaAlertsAbatementTableTTLSeconds", tcaAlertsAbatementTableTTLSeconds) + .toString(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java new file mode 100644 index 0000000..b55ab4f --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java @@ -0,0 +1,349 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.settings; + +import com.google.common.base.Objects; +import org.onap.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + +/** + * <p> + * App Preferences for Analytics TCA (Threshold Crossing Alert) App + * <p> + * @author Rajiv Singla . Creation Date: 10/4/2016. + */ +public class TCAAppPreferences implements CDAPAppPreferences { + + private static final long serialVersionUID = 1L; + + // subscriber preferences + protected String subscriberHostName; + + protected Integer subscriberHostPort; + + protected String subscriberTopicName; + + protected String subscriberProtocol; + + protected String subscriberUserName; + + protected String subscriberUserPassword; + + protected String subscriberContentType; + + protected String subscriberConsumerId; + + protected String subscriberConsumerGroup; + + protected Integer subscriberTimeoutMS; + + protected Integer subscriberMessageLimit; + + protected Integer subscriberPollingInterval; + + // publisher preferences + protected String publisherHostName; + + protected Integer publisherHostPort; + + protected String publisherTopicName; + + protected String publisherProtocol; + + protected String publisherUserName; + + protected String publisherUserPassword; + + protected String publisherContentType; + + protected Integer publisherMaxBatchSize; + + protected Integer publisherMaxRecoveryQueueSize; + + protected Integer publisherPollingInterval; + + protected Boolean enableAlertCEFFormat; + + + // A&AI Enrichment + + protected Boolean enableAAIEnrichment; + + protected String aaiEnrichmentHost; + + protected Integer aaiEnrichmentPortNumber; + + protected String aaiEnrichmentProtocol; + + protected String aaiEnrichmentUserName; + + protected String aaiEnrichmentUserPassword; + + protected Boolean aaiEnrichmentIgnoreSSLCertificateErrors; + + protected String aaiVNFEnrichmentAPIPath; + + protected String aaiVMEnrichmentAPIPath; + + + // A&AI Enrichment Proxy + + protected String aaiEnrichmentProxyURL; + + /** + * Default constructor to setup default values for TCA App Preferences + */ + public TCAAppPreferences() { + + // subscriber defaults + subscriberPollingInterval = AnalyticsConstants.TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS; + + // publisher defaults + publisherMaxBatchSize = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE; + publisherMaxRecoveryQueueSize = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + publisherPollingInterval = AnalyticsConstants.TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS; + + enableAlertCEFFormat = AnalyticsConstants.TCA_DEFAULT_ENABLE_CEF_FORMATTED_ALERT; + + enableAAIEnrichment = AnalyticsConstants.TCA_DEFAULT_ENABLE_AAI_ENRICHMENT; + aaiEnrichmentIgnoreSSLCertificateErrors = + AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_IGNORE_SSL_CERTIFICATE_ERRORS; + aaiEnrichmentProxyURL = AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_PROXY_URL; + + } + + public String getSubscriberHostName() { + return subscriberHostName; + } + + public Integer getSubscriberHostPort() { + return subscriberHostPort; + } + + public String getSubscriberTopicName() { + return subscriberTopicName; + } + + public String getSubscriberProtocol() { + return subscriberProtocol; + } + + public String getSubscriberUserName() { + return subscriberUserName; + } + + public String getSubscriberUserPassword() { + return subscriberUserPassword; + } + + public String getSubscriberContentType() { + return subscriberContentType; + } + + public String getSubscriberConsumerId() { + return subscriberConsumerId; + } + + public String getSubscriberConsumerGroup() { + return subscriberConsumerGroup; + } + + public Integer getSubscriberTimeoutMS() { + return subscriberTimeoutMS; + } + + public Integer getSubscriberMessageLimit() { + return subscriberMessageLimit; + } + + public Integer getSubscriberPollingInterval() { + return subscriberPollingInterval; + } + + public String getPublisherHostName() { + return publisherHostName; + } + + public Integer getPublisherHostPort() { + return publisherHostPort; + } + + public String getPublisherTopicName() { + return publisherTopicName; + } + + public String getPublisherProtocol() { + return publisherProtocol; + } + + public String getPublisherUserName() { + return publisherUserName; + } + + public String getPublisherUserPassword() { + return publisherUserPassword; + } + + public String getPublisherContentType() { + return publisherContentType; + } + + public Integer getPublisherMaxBatchSize() { + return publisherMaxBatchSize; + } + + public Integer getPublisherMaxRecoveryQueueSize() { + return publisherMaxRecoveryQueueSize; + } + + public Integer getPublisherPollingInterval() { + return publisherPollingInterval; + } + + public Boolean getEnableAlertCEFFormat() { + return enableAlertCEFFormat; + } + + + public void setSubscriberHostName(String subscriberHostName) { + this.subscriberHostName = subscriberHostName; + } + + public void setSubscriberHostPort(Integer subscriberHostPort) { + this.subscriberHostPort = subscriberHostPort; + } + + public void setSubscriberTopicName(String subscriberTopicName) { + this.subscriberTopicName = subscriberTopicName; + } + + public void setSubscriberProtocol(String subscriberProtocol) { + this.subscriberProtocol = subscriberProtocol; + } + + public void setSubscriberUserName(String subscriberUserName) { + this.subscriberUserName = subscriberUserName; + } + + public void setSubscriberUserPassword(String subscriberUserPassword) { + this.subscriberUserPassword = subscriberUserPassword; + } + + public void setPublisherHostName(String publisherHostName) { + this.publisherHostName = publisherHostName; + } + + public void setPublisherHostPort(Integer publisherHostPort) { + this.publisherHostPort = publisherHostPort; + } + + public void setPublisherTopicName(String publisherTopicName) { + this.publisherTopicName = publisherTopicName; + } + + public void setPublisherProtocol(String publisherProtocol) { + this.publisherProtocol = publisherProtocol; + } + + public void setPublisherUserName(String publisherUserName) { + this.publisherUserName = publisherUserName; + } + + public void setPublisherUserPassword(String publisherUserPassword) { + this.publisherUserPassword = publisherUserPassword; + } + + public Boolean getEnableAAIEnrichment() { + return enableAAIEnrichment; + } + + public String getAaiEnrichmentHost() { + return aaiEnrichmentHost; + } + + public Integer getAaiEnrichmentPortNumber() { + return aaiEnrichmentPortNumber; + } + + public String getAaiEnrichmentProtocol() { + return aaiEnrichmentProtocol; + } + + public String getAaiEnrichmentUserName() { + return aaiEnrichmentUserName; + } + + public String getAaiEnrichmentUserPassword() { + return aaiEnrichmentUserPassword; + } + + public Boolean getAaiEnrichmentIgnoreSSLCertificateErrors() { + return aaiEnrichmentIgnoreSSLCertificateErrors; + } + + public String getAaiVNFEnrichmentAPIPath() { + return aaiVNFEnrichmentAPIPath; + } + + public String getAaiVMEnrichmentAPIPath() { + return aaiVMEnrichmentAPIPath; + } + + public String getAaiEnrichmentProxyURL() { + return aaiEnrichmentProxyURL; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("subscriberHostName", subscriberHostName) + .add("subscriberHostPort", subscriberHostPort) + .add("subscriberTopicName", subscriberTopicName) + .add("subscriberProtocol", subscriberProtocol) + .add("subscriberUserName", subscriberUserName) + .add("subscriberContentType", subscriberContentType) + .add("subscriberConsumerId", subscriberConsumerId) + .add("subscriberConsumerGroup", subscriberConsumerGroup) + .add("subscriberTimeoutMS", subscriberTimeoutMS) + .add("subscriberMessageLimit", subscriberMessageLimit) + .add("subscriberPollingInterval", subscriberPollingInterval) + .add("publisherHostName", publisherHostName) + .add("publisherHostPort", publisherHostPort) + .add("publisherTopicName", publisherTopicName) + .add("publisherProtocol", publisherProtocol) + .add("publisherUserName", publisherUserName) + .add("publisherContentType", publisherContentType) + .add("publisherMaxBatchSize", publisherMaxBatchSize) + .add("publisherMaxRecoveryQueueSize", publisherMaxRecoveryQueueSize) + .add("publisherPollingInterval", publisherPollingInterval) + .add("enableAlertCEFFormat", enableAlertCEFFormat) + .add("enableAAIEnrichment", enableAAIEnrichment) + .add("aaiEnrichmentHost", aaiEnrichmentHost) + .add("aaiEnrichmentPortNumber", aaiEnrichmentPortNumber) + .add("aaiEnrichmentProtocol", aaiEnrichmentProtocol) + .add("aaiEnrichmentUserName", aaiEnrichmentUserName) + .add("aaiEnrichmentIgnoreSSLCertificateErrors", aaiEnrichmentIgnoreSSLCertificateErrors) + .add("aaiVNFEnrichmentAPIPath", aaiVNFEnrichmentAPIPath) + .add("aaiVMEnrichmentAPIPath", aaiVMEnrichmentAPIPath) + .add("aaiEnrichmentProxyEnabled", aaiEnrichmentProxyURL == null ? "false" : "true") + .toString(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java new file mode 100644 index 0000000..5d86cfc --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java @@ -0,0 +1,36 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.settings; + +import org.onap.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; + +/** + * A wrapper over {@link TCAPolicy} to act as app Preferences as TCA Policy is passed + * by controller as runtime arguments from CDAP app preferences + * <p> + * @author Rajiv Singla . Creation Date: 11/29/2016. + */ +public class TCAPolicyPreferences extends TCAPolicy implements CDAPAppPreferences { + + private static final long serialVersionUID = 1L; + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java new file mode 100644 index 0000000..b987848 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java @@ -0,0 +1,97 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.utils; + +import com.google.common.base.Function; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; + +import javax.annotation.Nonnull; + +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + + +/** + * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} + * <p> + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +public class AppPreferencesToPublisherConfigMapper implements Function<TCAAppPreferences, DMaaPMRPublisherConfig> { + + /** + * Factory method to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object + * + * @param tcaAppPreferences tca App Preferences + * + * @return publisher config object + */ + public static DMaaPMRPublisherConfig map(final TCAAppPreferences tcaAppPreferences) { + return new AppPreferencesToPublisherConfigMapper().apply(tcaAppPreferences); + } + + /** + * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object + * + * @param tcaAppPreferences tca App Preferences + * + * @return publisher config object + */ + @Nonnull + @Override + public DMaaPMRPublisherConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { + + // Create a new publisher settings builder + final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = new DMaaPMRPublisherConfig.Builder( + tcaAppPreferences.getPublisherHostName(), tcaAppPreferences.getPublisherTopicName()); + + // Setup up any optional publisher parameters if they are present + final Integer publisherHostPort = tcaAppPreferences.getPublisherHostPort(); + if (publisherHostPort != null) { + publisherConfigBuilder.setPortNumber(publisherHostPort); + } + final String publisherProtocol = tcaAppPreferences.getPublisherProtocol(); + if (isPresent(publisherProtocol)) { + publisherConfigBuilder.setProtocol(publisherProtocol); + } + final String publisherUserName = tcaAppPreferences.getPublisherUserName(); + if (isPresent(publisherUserName)) { + publisherConfigBuilder.setUserName(publisherUserName); + } + final String publisherUserPassword = tcaAppPreferences.getPublisherUserPassword(); + if (isPresent(publisherUserPassword)) { + publisherConfigBuilder.setUserPassword(publisherUserPassword); + } + final String publisherContentType = tcaAppPreferences.getPublisherContentType(); + if (isPresent(publisherContentType)) { + publisherConfigBuilder.setContentType(publisherContentType); + } + final Integer publisherMaxBatchSize = tcaAppPreferences.getPublisherMaxBatchSize(); + if (publisherMaxBatchSize != null) { + publisherConfigBuilder.setMaxBatchSize(publisherMaxBatchSize); + } + final Integer publisherMaxRecoveryQueueSize = tcaAppPreferences.getPublisherMaxRecoveryQueueSize(); + if (publisherMaxRecoveryQueueSize != null) { + publisherConfigBuilder.setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize); + } + + return publisherConfigBuilder.build(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java new file mode 100644 index 0000000..650410c --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java @@ -0,0 +1,113 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.utils; + +import com.google.common.base.Function; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; + +import javax.annotation.Nonnull; + +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + + +/** + * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} + * + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +public class AppPreferencesToSubscriberConfigMapper implements Function<TCAAppPreferences, DMaaPMRSubscriberConfig> { + + /** + * Factory Method to converts {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object + * + * @param tcaAppPreferences tca app preferences + * @return DMaaP Subscriber Config + */ + public static DMaaPMRSubscriberConfig map(final TCAAppPreferences tcaAppPreferences) { + return new AppPreferencesToSubscriberConfigMapper().apply(tcaAppPreferences); + } + + /** + * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object + * + * @param tcaAppPreferences tca app preferences + * + * @return DMaaP Subscriber Config + */ + @Nonnull + @Override + public DMaaPMRSubscriberConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { + + // Create a new subscriber settings builder + final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder( + tcaAppPreferences.getSubscriberHostName(), tcaAppPreferences.getSubscriberTopicName()); + + // Setup up any optional subscriber parameters if they are present + final Integer subscriberHostPortNumber = tcaAppPreferences.getSubscriberHostPort(); + if (subscriberHostPortNumber != null) { + subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber); + } + + final String subscriberProtocol = tcaAppPreferences.getSubscriberProtocol(); + if (isPresent(subscriberProtocol)) { + subscriberConfigBuilder.setProtocol(subscriberProtocol); + } + + final String subscriberUserName = tcaAppPreferences.getSubscriberUserName(); + if (isPresent(subscriberUserName)) { + subscriberConfigBuilder.setUserName(subscriberUserName); + } + + final String subscriberUserPassword = tcaAppPreferences.getSubscriberUserPassword(); + if (isPresent(subscriberUserPassword)) { + subscriberConfigBuilder.setUserPassword(subscriberUserPassword); + } + + final String subscriberContentType = tcaAppPreferences.getSubscriberContentType(); + if (isPresent(subscriberContentType)) { + subscriberConfigBuilder.setContentType(subscriberContentType); + } + + final String subscriberConsumerId = tcaAppPreferences.getSubscriberConsumerId(); + if (isPresent(subscriberConsumerId)) { + subscriberConfigBuilder.setConsumerId(subscriberConsumerId); + } + + final String subscriberConsumerGroup = tcaAppPreferences.getSubscriberConsumerGroup(); + if (isPresent(subscriberConsumerGroup)) { + subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup); + } + + final Integer subscriberTimeoutMS = tcaAppPreferences.getSubscriberTimeoutMS(); + if (subscriberTimeoutMS != null) { + subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS); + } + final Integer subscriberMessageLimit = tcaAppPreferences.getSubscriberMessageLimit(); + if (subscriberMessageLimit != null) { + subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit); + } + + // return Subscriber settings + return subscriberConfigBuilder.build(); + + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java new file mode 100644 index 0000000..38f19aa --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java @@ -0,0 +1,321 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.utils; + +import co.cask.cdap.api.RuntimeContext; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig; +import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfigBuilder; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator; +import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.model.config.tca.DMAAPInfo; +import org.onap.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig; +import org.onap.dcae.apod.analytics.model.config.tca.TCAHandleIn; +import org.onap.dcae.apod.analytics.model.config.tca.TCAHandleOut; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static com.google.common.collect.Lists.newArrayList; +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings; +import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH; + +/** + * Utility Helper methods for CDAP TCA sub module. + * + * <p> + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +public abstract class CDAPTCAUtils extends TCAUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class); + + /** + * Function that extracts alert message string from {@link TCAVESAlertEntity} + */ + public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION = + new Function<TCAVESAlertEntity, String>() { + @Override + public String apply(TCAVESAlertEntity alertEntity) { + return alertEntity == null ? null : alertEntity.getAlertMessage(); + } + }; + + + /** + * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object + * + * @param runtimeContext Runtime Context + * + * @return validated runtime arguments as {@link TCAAppPreferences} object + */ + public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) { + // Parse runtime arguments + final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); + final TCAAppPreferences tcaAppPreferences = + ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class); + + final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration(); + + // populate DMaaP Information from App Config String + populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences); + + // Validate runtime arguments + validateSettings(tcaAppPreferences, new TCAPreferencesValidator()); + + return tcaAppPreferences; + } + + /** + * Creates an A&AI Http Client config from give {@link TCAAppPreferences} + * + * @param tcaAppPreferences TCA App Preferences + * + * @return A&AI Http Client config + */ + public static AAIHttpClientConfig createAAIEnrichmentClientConfig(final TCAAppPreferences tcaAppPreferences) { + final String aaiEnrichmentProxyURLString = tcaAppPreferences.getAaiEnrichmentProxyURL(); + URL aaiEnrichmentProxyURL = null; + if (StringUtils.isNotBlank(aaiEnrichmentProxyURLString)) { + aaiEnrichmentProxyURL = parseURL(aaiEnrichmentProxyURLString); + } + + return new AAIHttpClientConfigBuilder(tcaAppPreferences.getAaiEnrichmentHost()) + .setAaiProtocol(tcaAppPreferences.getAaiEnrichmentProtocol()) + .setAaiHostPortNumber(tcaAppPreferences.getAaiEnrichmentPortNumber()) + .setAaiUserName(tcaAppPreferences.getAaiEnrichmentUserName()) + .setAaiUserPassword(tcaAppPreferences.getAaiEnrichmentUserPassword()) + .setAaiProxyURL(aaiEnrichmentProxyURL) + .setAaiIgnoreSSLCertificateErrors(tcaAppPreferences.getAaiEnrichmentIgnoreSSLCertificateErrors()) + .build(); + } + + /** + * Populated App Preferences DMaaP Information from Application Config String + * + * @param appConfigString CDAP Application config String + * @param tcaAppPreferences TCA App Preferences + */ + private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString, + final TCAAppPreferences tcaAppPreferences) { + + if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) { + LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config"); + return; + } + + LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString); + + try { + final TCAControllerAppConfig tcaControllerAppConfig = + readValue(appConfigString, TCAControllerAppConfig.class); + + // Parse Subscriber DMaaP information from App Config String + if (tcaControllerAppConfig.getStreamsSubscribes() != null && + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null && + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) { + + final DMAAPInfo subscriberDmaapInfo = + tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo(); + LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl()); + final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl()); + tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol()); + tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost()); + final int subscriberUrlPort = subscriberUrl.getPort() != -1 ? + subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol()); + tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort); + tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8)); + + final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn(); + tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName()); + tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword()); + } else { + LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString); + } + + + // Parse Publisher DMaaP information from App Config String + if (tcaControllerAppConfig.getStreamsPublishes() != null && + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null && + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) { + + final DMAAPInfo publisherDmaapInfo = + tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo(); + LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl()); + final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl()); + tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol()); + tcaAppPreferences.setPublisherHostName(publisherUrl.getHost()); + final int publisherUrlPort = publisherUrl.getPort() != -1 ? + publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol()); + tcaAppPreferences.setPublisherHostPort(publisherUrlPort); + tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8)); + + final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut(); + tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName()); + tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword()); + } else { + LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString); + } + + + } catch (IOException e) { + throw new CDAPSettingsException( + "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e); + } + } + + /** + * Parses provided DMaaP MR URL string to {@link URL} object + * + * @param urlString url string + * + * @return url object + */ + private static URL parseURL(final String urlString) { + try { + return new URL(urlString); + } catch (MalformedURLException e) { + final String errorMessage = String.format("Invalid URL format: %s", urlString); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + /** + * Sets up default DMaaP Port if not provided with DMaaP URL + * + * @param protocol protocol e.g. http or https + * + * @return default DMaaP MR port number + */ + private static int getDefaultDMaaPPort(final String protocol) { + if ("http".equals(protocol)) { + return 3904; + } else if ("https".equals(protocol)) { + return 3905; + } else { + return 80; + } + } + + + /** + * Extracts alert message strings from {@link TCAVESAlertEntity} + * + * @param alertEntities collection of alert entities + * + * @return List of alert message strings + */ + public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) { + return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION); + } + + + /** + * Converts Runtime Arguments to {@link TCAPolicyPreferences} object + * + * @param runtimeContext CDAP Runtime Arguments + * + * @return TCA Policy Preferences + */ + public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) { + + final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); + final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments); + + LOG.debug("Printing all Received Runtime Arguments:"); + for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) { + LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue()); + } + + TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences(); + + final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY); + + if (StringUtils.isNotBlank(tcaPolicyJsonString)) { + + LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}", + AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString); + + // initialize unquotedTCAPolicy + String unquotedTCAPolicy = tcaPolicyJsonString.trim(); + + //remove starting and ending quote from passed tca policy Json string if present + if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && + tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) { + unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1); + } + + try { + tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class); + } catch (IOException e) { + throw new CDAPSettingsException( + "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e); + } + + } else { // classical controller is being used. Validate preferences as received from classical controller + + LOG.info("TcaPolicy is being parsed as key value pair from classical controller"); + + // extract TCA Policy Domain from Runtime Arguments + final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH); + + // create new TCA Policy object + tcaPolicyPreferences.setDomain(policyDomain); + + // filter out other non relevant fields which are not related to tca policy + final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments, + TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH); + + // determine functional Roles + final Map<String, Map<String, String>> functionalRolesMap = + extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); + + // create metrics per functional role list + tcaPolicyPreferences.setMetricsPerEventName( + createTCAPolicyMetricsPerEventNameList(functionalRolesMap)); + + } + + // validate tca Policy Preferences + validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator()); + + LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences); + + return tcaPolicyPreferences; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java new file mode 100644 index 0000000..068119d --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java @@ -0,0 +1,62 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.validator; + +import org.onap.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * <p> + * TCA App Config Validator validates any TCA App Config parameter values + * </p> + * + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +public class TCAAppConfigValidator implements CDAPAppSettingsValidator<TCAAppConfig, + GenericValidationResponse<TCAAppConfig>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAAppConfig> validateAppSettings(TCAAppConfig tcaAppConfig) { + + final GenericValidationResponse<TCAAppConfig> validationResponse = new GenericValidationResponse<>(); + + if (isEmpty(tcaAppConfig.getTcaSubscriberOutputStreamName())) { + validationResponse.addErrorMessage("tcaSubscriberOutputStreamName", + "tcaSubscriberOutputStreamName must be present"); + } + + if (isEmpty(tcaAppConfig.getTcaVESMessageStatusTableName())) { + validationResponse.addErrorMessage("tcaVESMessageStatusTableName", + "tcaVESMessageStatusTableName must be present"); + } + if (isEmpty(tcaAppConfig.getTcaVESAlertsTableName())) { + validationResponse.addErrorMessage("tcaVESAlertsTableName", + "tcaVESAlertsTableName must be present"); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java new file mode 100644 index 0000000..118b852 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java @@ -0,0 +1,115 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.validator; + +import org.onap.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; +import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; + +import java.util.List; + +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * Validates TCA Policy Preferences + * <p> + * + * @author Rajiv Singla . Creation Date: 11/29/2016. + */ +public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<TCAPolicyPreferences, + GenericValidationResponse<TCAPolicyPreferences>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAPolicyPreferences> validateAppSettings( + final TCAPolicyPreferences tcaPolicyPreferences) { + + final GenericValidationResponse<TCAPolicyPreferences> validationResponse = new GenericValidationResponse<>(); + + // validate TCA Policy must domain present + final String domain = tcaPolicyPreferences.getDomain(); + if (isEmpty(domain)) { + validationResponse.addErrorMessage("domain", "TCA Policy must have only one domain present"); + } + + // validate TCA Policy must have at least one event name + final List<String> policyEventNames = TCAUtils.getPolicyEventNames(tcaPolicyPreferences); + if (policyEventNames.isEmpty()) { + validationResponse.addErrorMessage("metricsPerEventNames", + "TCA Policy must have at least one or more event names"); + } + + final List<MetricsPerEventName> metricsPerEventNames = + tcaPolicyPreferences.getMetricsPerEventName(); + + // validate Metrics Per Event Name + for (MetricsPerEventName metricsPerEventName : metricsPerEventNames) { + + // event name must be present + final String eventName = metricsPerEventName.getEventName(); + if (isEmpty(eventName)) { + validationResponse.addErrorMessage("eventName", + "TCA Policy eventName is not present for metricsPerEventName:" + metricsPerEventName); + } + + // control Loop Schema type must be present + final ControlLoopSchemaType controlLoopSchemaType = metricsPerEventName.getControlLoopSchemaType(); + if (controlLoopSchemaType == null) { + validationResponse.addErrorMessage("controlLoopEventType", + "TCA Policy controlLoopSchemaType is not present for metricsPerEventName:" + + metricsPerEventName); + } + + // must have at least 1 threshold defined + if (metricsPerEventName.getThresholds() == null || metricsPerEventName.getThresholds().isEmpty()) { + validationResponse.addErrorMessage("thresholds", + "TCA Policy event Name must have at least one threshold. " + + "Event Name causing this validation error:" + metricsPerEventName); + } else { + // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity + final List<Threshold> eventNameThresholds = metricsPerEventName.getThresholds(); + for (Threshold eventNameThreshold : eventNameThresholds) { + final String fieldPath = eventNameThreshold.getFieldPath(); + final Long thresholdValue = eventNameThreshold.getThresholdValue(); + final Direction direction = eventNameThreshold.getDirection(); + final EventSeverity severity = eventNameThreshold.getSeverity(); + final ClosedLoopEventStatus closedLoopEventStatus = eventNameThreshold.getClosedLoopEventStatus(); + if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null || + closedLoopEventStatus == null) { + validationResponse.addErrorMessage("threshold", + "TCA Policy threshold must have fieldPath,thresholdValue,direction, " + + "closedLoopEventStatus and severity defined." + + "Threshold causing this validation error:" + eventNameThreshold); + } + } + } + } + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java new file mode 100644 index 0000000..261b74d --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java @@ -0,0 +1,84 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.validator; + +import org.onap.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppPreferences, + GenericValidationResponse<TCAAppPreferences>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAAppPreferences> validateAppSettings(TCAAppPreferences appPreferences) { + + final GenericValidationResponse<TCAAppPreferences> validationResponse = new GenericValidationResponse<>(); + + // subscriber validations + final String subscriberHostName = appPreferences.getSubscriberHostName(); + if (isEmpty(subscriberHostName)) { + validationResponse.addErrorMessage("subscriberHostName", "Subscriber host name must be present"); + } + final String subscriberTopicName = appPreferences.getSubscriberTopicName(); + if (isEmpty(subscriberTopicName)) { + validationResponse.addErrorMessage("subscriberTopicName", "Subscriber topic name must be present"); + } + + // publisher validations + final String publisherHostName = appPreferences.getPublisherHostName(); + if (isEmpty(publisherHostName)) { + validationResponse.addErrorMessage("publisherHostName", "Publisher host name must be present"); + } + final String publisherTopicName = appPreferences.getPublisherTopicName(); + if (isEmpty(publisherTopicName)) { + validationResponse.addErrorMessage("publisherTopicName", "Publisher topic name must be present"); + } + + final Boolean enableAAIEnrichment = appPreferences.getEnableAAIEnrichment(); + + // if aai enrichment is enabled then do some aai validations + if (enableAAIEnrichment) { + final String aaiEnrichmentHost = appPreferences.getAaiEnrichmentHost(); + if (isEmpty(aaiEnrichmentHost)) { + validationResponse.addErrorMessage("aaiEnrichmentHost", "AAI Enrichment Host must be present"); + } + final String aaiVMEnrichmentAPIPath = appPreferences.getAaiVMEnrichmentAPIPath(); + if (isEmpty(aaiVMEnrichmentAPIPath)) { + validationResponse.addErrorMessage("aaiVMEnrichmentAPIPath", "AAI VM Enrichment path must be present"); + } + final String aaiVNFEnrichmentAPIPath = appPreferences.getAaiVNFEnrichmentAPIPath(); + if (isEmpty(aaiVNFEnrichmentAPIPath)) { + validationResponse.addErrorMessage("aaiVNFEnrichmentAPIPath", "AAI VNF Enrichment path must be " + + "present"); + } + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java new file mode 100644 index 0000000..f9deac8 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java @@ -0,0 +1,116 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.worker.AbstractWorker; +import com.google.common.base.Preconditions; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.String.format; + +/** + * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals + * <p> + * @author Rajiv Singla . Creation Date: 12/19/2016. + */ +public abstract class BaseTCADMaaPMRWorker extends AbstractWorker { + + private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class); + + /** + * Quartz Scheduler + */ + protected Scheduler scheduler; + /** + * Determines if scheduler is shutdown + */ + protected AtomicBoolean isSchedulerShutdown; + + + @Override + public void run() { + + Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); + String schedulerName = ""; + + // Start scheduler + try { + schedulerName = scheduler.getSchedulerName(); + scheduler.start(); + isSchedulerShutdown.getAndSet(false); + + } catch (SchedulerException e) { + final String errorMessage = + format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName); + + // indefinite loop which wakes up and confirms scheduler is indeed running + while (!isSchedulerShutdown.get()) { + try { + + Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS); + + } catch (InterruptedException e) { + + final String errorMessage = + format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s", + schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName); + + } + + @Override + public void stop() { + + Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); + String schedulerName = ""; + + // Stop Scheduler + try { + schedulerName = scheduler.getSchedulerName(); + LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName); + scheduler.shutdown(); + isSchedulerShutdown.getAndSet(true); + + } catch (SchedulerException e) { + + final String errorMessage = + format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java new file mode 100644 index 0000000..2114c8c --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java @@ -0,0 +1,200 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.TxRunnable; +import co.cask.cdap.api.common.Bytes; +import co.cask.cdap.api.data.DatasetContext; +import co.cask.cdap.api.dataset.lib.CloseableIterator; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import com.google.common.base.Joiner; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.tephra.TransactionFailureException; +import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.common.utils.HTTPUtils; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME; +import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME; +import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME; +import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME; + +/** + * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to + * DMaaP MR topic + *<p> + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +@DisallowConcurrentExecution +@PersistJobDataAfterExecution +@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") +public class TCADMaaPMRPublisherJob implements Job { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class); + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}", + jobExecutionContext.getNextFireTime()); + + // Get Job Data Map + final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); + + // Fetch all Job Params from Job Data Map + final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME); + final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME); + final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME); + final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME); + + LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName); + + // Get new alerts from alerts table + final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext); + + // If no new alerts are found - nothing to publish + if (newAlertsMap.isEmpty()) { + LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1); + return; + } + + final int newAlertsCount = newAlertsMap.size(); + LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName, + newAlertsCount); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount); + + // Get alert message strings from alert Entities + final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values()); + + // Publish messages to DMaaP MR Topic + try { + + final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages); + + final Integer responseCode = publisherResponse.getResponseCode(); + final String responseMessage = publisherResponse.getResponseMessage(); + final int pendingMessagesCount = publisherResponse.getPendingMessagesCount(); + + LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode, + responseMessage, pendingMessagesCount); + + if (HTTPUtils.isSuccessfulResponseCode(responseCode)) { + LOG.debug("Successfully Published alerts to DMaaP MR Topic."); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); + } else { + LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later...."); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); + } + + } catch (DCAEAnalyticsRuntimeException e) { + LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e); + } finally { + // delete send message from alerts table + deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics); + } + + LOG.debug("Finished DMaaP MR Topic Publisher fetch Job."); + + } + + /** + * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values + * + * @param cdapAlertsTableName alerts table name + * @param workerContext worker context + * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values + */ + protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName, + final WorkerContext workerContext) { + final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>(); + try { + workerContext.execute(new TxRunnable() { + @Override + public void run(DatasetContext context) throws Exception { + final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); + final Date currentTime = new Date(); + final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime); + final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey); + while (scan.hasNext()) { + final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next(); + newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue()); + } + } + }); + } catch (TransactionFailureException e) { + final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + return newAlertsMap; + } + + /** + * Deletes rows in Alerts table for give rowKeys + * + * @param cdapAlertsTableName CDAP Alerts Table Name + * @param workerContext Worker Context + * @param rowKeys Row Key Set + * @param metrics CDAP metrics + */ + protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext, + final Set<String> rowKeys, final Metrics metrics) { + LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys)); + try { + workerContext.execute(new TxRunnable() { + @Override + public void run(DatasetContext context) throws Exception { + final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); + for (String rowKey : rowKeys) { + alertsTable.delete(rowKey); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1); + } + } + }); + } catch (TransactionFailureException e) { + final String errorMessage = + "Transaction Error while deleting published alerts in alerts table: " + e.toString(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java new file mode 100644 index 0000000..1714d65 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java @@ -0,0 +1,114 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import com.google.common.base.Optional; +import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.onap.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static java.lang.String.format; + +/** + * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to + * a given CDAP Stream + * + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +@DisallowConcurrentExecution +@PersistJobDataAfterExecution +public class TCADMaaPMRSubscriberJob implements Job { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class); + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}", + jobExecutionContext.getNextFireTime()); + + // Get Job Data Map + final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); + + // Fetch all Job Params from Job Data Map + final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME); + final WorkerContext workerContext = + (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME); + final DMaaPMRSubscriber subscriber = + (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME); + final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME); + + final Optional<List<String>> subscriberMessagesOptional = + DMaaPMRUtils.getSubscriberMessages(subscriber, metrics); + + // Write message to CDAP Stream using Stream Writer + if (subscriberMessagesOptional.isPresent()) { + writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics); + } + } + + + /** + * Writes given messages to CDAP Stream + * + * @param actualMessages List of messages that need to written to cdap stream + * @param cdapStreamName cdap stream name + * @param workerContext cdap worker context + * @param metrics cdap metrics + */ + private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName, + final WorkerContext workerContext, final Metrics metrics) { + LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size()); + try { + + for (String message : actualMessages) { + workerContext.write(cdapStreamName, message); + } + + } catch (IOException e) { + metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1); + final String errorMessage = + format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " + + "Exception: %s", cdapStreamName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}", + cdapStreamName, actualMessages.size()); + + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java new file mode 100644 index 0000000..e8130f3 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java @@ -0,0 +1,141 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.worker.AbstractWorker; +import co.cask.cdap.api.worker.WorkerContext; +import com.fasterxml.jackson.core.type.TypeReference; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.onap.dcae.apod.analytics.tca.utils.TCAUtils.readValue; +import static org.onap.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString; + +/** + * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic. + * The mock instead of making DMaaP MR calls will actually take messages + * from file and send them to stream at subscriber polling interval + * + * TODO: To be removed before going to production - only for testing purposes + * + * @author Rajiv Singla . Creation Date: 11/4/2016. + */ +public class TCADMaaPMockSubscriberWorker extends AbstractWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class); + + // TODO: Remove this file before going to production - only for mocking purposes + private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json"; + private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE = + new TypeReference<List<EventListener>>() { + }; + + private TCAAppPreferences tcaAppPreferences; + private boolean stopSendingMessages; + @Property + private final String tcaSubscriberOutputStreamName; + + public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) { + this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; + } + + @Override + public void configure() { + setName("MockTCASubscriberWorker"); + setDescription("Writes Mocked VES messages to CDAP Stream"); + LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker"); + } + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences); + this.tcaAppPreferences = appPreferences; + this.stopSendingMessages = false; + } + + + @Override + public void run() { + final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); + LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval); + + final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream + (MOCK_MESSAGE_FILE_LOCATION); + + if (resourceAsStream == null) { + LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION); + throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException()); + } + + + try { + List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE); + + final int totalMessageCount = eventListeners.size(); + LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount); + + int i = 1; + for (EventListener eventListener : eventListeners) { + if (stopSendingMessages) { + LOG.debug("Stop sending messages......"); + break; + } + final String eventListenerString = writeValueAsString(eventListener); + LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount); + getContext().write(tcaSubscriberOutputStreamName, eventListenerString); + i++; + + try { + Thread.sleep(subscriberPollingInterval); + } catch (InterruptedException e) { + LOG.error("Error while sleeping"); + throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e); + } + } + + LOG.debug("Finished writing mock messages to CDAP Stream"); + + } catch (IOException e) { + LOG.error("Error while parsing json file"); + throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e); + } + + + } + + @Override + public void stop() { + stopSendingMessages = true; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java new file mode 100644 index 0000000..78dbd35 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java @@ -0,0 +1,146 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToPublisherConfigMapper; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.quartz.JobDataMap; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.String.format; + +/** + * TCA DMaaP Publisher will monitor alerts table at regular intervals and publish any alerts to DMaaP MR Publishing + * Topic + * <p> + * @author Rajiv Singla . Creation Date: 11/16/2016. + */ +public class TCADMaaPPublisherWorker extends BaseTCADMaaPMRWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPPublisherWorker.class); + + private DMaaPMRPublisher publisher; + private Metrics metrics; + @Property + private final String tcaVESAlertsTableName; + + public TCADMaaPPublisherWorker(final String tcaVESAlertsTableName) { + this.tcaVESAlertsTableName = tcaVESAlertsTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); + setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_DESCRIPTION_WORKER); + LOG.debug("Configuring TCA MR DMaaP Publisher worker with name: {}", + CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); + } + + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + + LOG.info("Initializing TCA MR DMaaP Publisher worker with preferences: {}", tcaAppPreferences); + + // Map TCA App Preferences to DMaaP MR Publisher Config + final DMaaPMRPublisherConfig publisherConfig = AppPreferencesToPublisherConfigMapper.map(tcaAppPreferences); + + LOG.info("TCA DMaaP MR Publisher worker will be polling TCA Alerts Table Name: {}", tcaVESAlertsTableName); + + // Create an instance of DMaaP MR Publisher + LOG.debug("Creating an instance of DMaaP Publisher"); + publisher = DMaaPMRFactory.create().createPublisher(publisherConfig); + + // initialize a new Quartz scheduler + initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); + + // initialize scheduler state + isSchedulerShutdown = new AtomicBoolean(true); + } + + + /** + * Stop DMaaP Publisher + */ + @Override + public void stop() { + // Close Publisher - which will flush any batch messages if present in batch queue + if (publisher != null) { + try { + publisher.close(); + } catch (Exception e) { + final String errorMessage = format("Error while shutting down DMaaP MR Publisher: %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + // Shut down scheduler + super.stop(); + } + + + /** + * Initializes a scheduler instance for DMaaP MR Publisher Job + * + * @throws SchedulerException SchedulerException + */ + private void initializeScheduler(final TCAAppPreferences tcaAnalyticsAppConfig, + final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { + + // Get Publisher polling interval + final Integer publisherPollingInterval = tcaAnalyticsAppConfig.getPublisherPollingInterval(); + + // Publisher Quartz Properties file + final String quartzPublisherPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_PUBLISHER_PROPERTIES_FILE_NAME; + + // Create a new JobDataMap containing information required by TCA DMaaP Publisher Job + final JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME, tcaVESAlertsTableName); + jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); + jobDataMap.put(AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME, publisher); + jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); + + // Create new publisher scheduler + scheduler = TCAUtils.createQuartzScheduler(publisherPollingInterval, stdSchedulerFactory, + quartzPublisherPropertiesFileName, jobDataMap, TCADMaaPMRPublisherJob.class, + AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_JOB_NAME, + AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_TRIGGER_NAME); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java new file mode 100644 index 0000000..64bf0d1 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java @@ -0,0 +1,124 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToSubscriberConfigMapper; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.quartz.JobDataMap; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * TCA DMaaP Subscriber will read messages and post them to cdap stream at regular intervals + * <p> + * @author Rajiv Singla . Creation Date: 10/14/2016. + */ +public class TCADMaaPSubscriberWorker extends BaseTCADMaaPMRWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPSubscriberWorker.class); + + private DMaaPMRSubscriber subscriber; + private Metrics metrics; + @Property + private final String tcaSubscriberOutputStreamName; + + public TCADMaaPSubscriberWorker(final String tcaSubscriberOutputStreamName) { + this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; + } + + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); + setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_DESCRIPTION_WORKER); + LOG.debug("Configuring TCA MR DMaaP Subscriber worker with name: {}", + CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); + } + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + + LOG.info("Initializing TCA MR DMaaP Subscriber worker with preferences: {}", tcaAppPreferences); + + // Map TCA App Preferences to DMaaP MR Subscriber Config + final DMaaPMRSubscriberConfig subscriberConfig = AppPreferencesToSubscriberConfigMapper.map(tcaAppPreferences); + + LOG.info("TCA DMaaP MR Subscriber worker will be writing to CDAP Stream: {}", tcaSubscriberOutputStreamName); + + // Create an instance of DMaaP MR Subscriber + LOG.debug("Creating an instance of DMaaP Subscriber"); + subscriber = DMaaPMRFactory.create().createSubscriber(subscriberConfig); + + // initialize a new Quartz scheduler + initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); + + // initialize scheduler state + isSchedulerShutdown = new AtomicBoolean(true); + } + + /** + * Initializes a scheduler instance for DMaaP MR Subscriber Job + * + * @throws SchedulerException SchedulerException + */ + private void initializeScheduler(final TCAAppPreferences tcaAppPreferences, + final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { + + // Get Subscriber polling interval + final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); + + // Subscriber Quartz Properties file + final String quartzSubscriberPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_SUBSCRIBER_PROPERTIES_FILE_NAME; + + // Create a new JobDataMap containing information required by TCA DMaaP Subscriber Job + final JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME, tcaSubscriberOutputStreamName); + jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); + jobDataMap.put(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME, subscriber); + jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); + + // Create new publisher scheduler + scheduler = TCAUtils.createQuartzScheduler(subscriberPollingInterval, stdSchedulerFactory, + quartzSubscriberPropertiesFileName, jobDataMap, TCADMaaPMRSubscriberJob.class, + AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_JOB_NAME, + AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_TRIGGER_NAME); + } + + +} |