diff options
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java')
20 files changed, 2252 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java new file mode 100644 index 0000000..1a8cb5e --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/TCAAnalyticsApplication.java @@ -0,0 +1,104 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca; + +import co.cask.cdap.api.app.AbstractApplication; +import co.cask.cdap.api.data.stream.Stream; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils; +import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; +import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator; +import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker; +import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker; +import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> { + + private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class); + + @Override + public void configure() { + + + // ========= Application configuration Setup ============== // + final TCAAppConfig tcaAppConfig = getConfig(); + + LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig); + + // Validate application configuration + ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator()); + + // App Setup + setName(tcaAppConfig.getAppName()); + setDescription(tcaAppConfig.getAppDescription()); + + // ========== Streams Setup ============== // + // Create DMaaP MR Subscriber CDAP output stream + final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName(); + LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName); + final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName, + CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM); + addStream(subscriberOutputStream); + + + // ============ Datasets Setup ======== // + // Create TCA Message Status Table + final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName(); + final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds(); + LOG.info("Creating TCA Message Status Table: {} with TTL: {}", + tcaVESMessageStatusTableName, messageStatusTableTTLSeconds); + final DatasetProperties messageStatusTableProperties = + TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds); + createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties); + + // Create TCA VES Alerts Table + final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName(); + final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds(); + LOG.info("Creating TCA Alerts Table: {} with TTL: {}", + tcaVESAlertsTableName, alertsTableTTLSeconds); + final DatasetProperties alertTableProperties = + TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds); + createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties); + + // =========== Flow Setup ============= // + addFlow(new TCAVESCollectorFlow(tcaAppConfig)); + + // ========== Workers Setup =========== // + LOG.info("Creating TCA DMaaP Subscriber Worker"); + addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); + LOG.info("Creating TCA DMaaP Publisher Worker"); + addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName())); + // TODO: Remove this before going to production + addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName())); + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java new file mode 100644 index 0000000..d880a12 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flow/TCAVESCollectorFlow.java @@ -0,0 +1,69 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.flow; + +import co.cask.cdap.api.flow.AbstractFlow; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet; +import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet; +import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; + +/** + * TCA Flow for VES (Virtual Event Streaming) Collector Flow + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAVESCollectorFlow extends AbstractFlow { + + private final TCAAppConfig tcaAppConfig; + + public TCAVESCollectorFlow(TCAAppConfig tcaAppConfig) { + this.tcaAppConfig = tcaAppConfig; + } + + @Override + protected void configure() { + + setName(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_NAME_FLOW); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_COLLECTOR_DESCRIPTION_FLOW); + + final TCAVESMessageRouterFlowlet messageRouterFlowlet = new TCAVESMessageRouterFlowlet(); + addFlowlet(messageRouterFlowlet); + + final TCAVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = + new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName()); + addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances()); + + final TCAVESAlertsSinkFlowlet alertsSinkFlowlet = + new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName()); + addFlowlet(alertsSinkFlowlet); + + + // connect DMaaP MR VES Subscriber output stream to VES Message Router Flowlet + connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet); + // connect message router to VES threshold calculator + connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet); + // connect VES threshold calculator flowlet to Alerts Sink Flowlet + connect(thresholdViolationCalculatorFlowlet, alertsSinkFlowlet); + + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java new file mode 100644 index 0000000..c2da943 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowlet.java @@ -0,0 +1,71 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; + +/** + * Saves TCA VES Alert Messages in a Time series Table + * + * @author Rajiv Singla . Creation Date: 11/15/2016. + */ +public class TCAVESAlertsSinkFlowlet extends AbstractFlowlet { + + @Property + private final String tcaVESAlertsTableName; + + private ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertsTable; + + public TCAVESAlertsSinkFlowlet(String tcaVESAlertsTableName) { + this.tcaVESAlertsTableName = tcaVESAlertsTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET); + } + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + tcaVESAlertsTable = getContext().getDataset(tcaVESAlertsTableName); + } + + /** + * Saves messages to Alerts table + * + * @param alertMessage alert message + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + public void saveAlerts(String alertMessage) { + // Saves alert message in alerts table + TCAVESAlertsPersister.persist(alertMessage, tcaVESAlertsTable); + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java new file mode 100644 index 0000000..3023c90 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowlet.java @@ -0,0 +1,59 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.flow.flowlet.StreamEvent; +import com.google.common.base.Charsets; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; + +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY; + + +/** + * TCA Message Router Flowlet emits VES Message to {@link TCAVESThresholdViolationCalculatorFlowlet} instances + * + * @author Rajiv Singla . Creation Date: 11/14/2016. + */ +public class TCAVESMessageRouterFlowlet extends AbstractFlowlet { + + /** + * Emits ves message to TCA Calculator Instances + */ + @Output(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + protected OutputEmitter<String> vesMessageEmitter; + + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET); + } + + @ProcessInput + public void routeVESMessage(StreamEvent vesMessageStreamEvent) { + final String vesMessage = Charsets.UTF_8.decode(vesMessageStreamEvent.getBody()).toString(); + vesMessageEmitter.emit(vesMessage, TCA_VES_MESSAGE_ROUTER_PARTITION_KEY, vesMessage.hashCode()); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java new file mode 100644 index 0000000..b8460dc --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowlet.java @@ -0,0 +1,149 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.annotation.Output; +import co.cask.cdap.api.annotation.ProcessInput; +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.annotation.RoundRobin; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.AbstractFlowlet; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.metrics.Metrics; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister.persist; + +/** + * TCA VES Message Filter filters out messages which are not applicable for TCA as per TCA Policy + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class); + + @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) + protected OutputEmitter<String> tcaAlertOutputEmitter; + protected Metrics metrics; + private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; + + @Property + private final String messageStatusTableName; + private Boolean enableAlertCEFFormat; + + private TCAPolicy tcaPolicy; + + /** + * Creates an instance of TCA VES Threshold violation calculator flowlet with give message status table name + * + * @param messageStatusTableName message status table name + */ + public TCAVESThresholdViolationCalculatorFlowlet(String messageStatusTableName) { + this.messageStatusTableName = messageStatusTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET); + setDescription(CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET); + } + + + @Override + public void initialize(FlowletContext flowletContext) throws Exception { + super.initialize(flowletContext); + + // parse Runtime Arguments to tca policy preferences + tcaPolicy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext); + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + enableAlertCEFFormat = tcaAppPreferences.getEnableAlertCEFFormat(); + vesMessageStatusTable = getContext().getDataset(messageStatusTableName); + + } + + /** + * Filters VES Messages that violates TCA Policy + * + * @param vesMessage VES Message + * @throws JsonProcessingException if alert message cannot be parsed into JSON object + */ + @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT) + @RoundRobin + public void filterVESMessages(String vesMessage) throws JsonProcessingException { + + TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE; + String alertMessage = null; + + // Step 1: Filter incoming messages + final TCACEFProcessorContext processorContext = TCAUtils.filterCEFMessage(vesMessage, tcaPolicy); + + if (processorContext.canProcessingContinue()) { + + // Step 2: Check if CEF Message violate any thresholds + final TCACEFProcessorContext processorContextWithViolations = + TCAUtils.computeThresholdViolations(processorContext); + + if (processorContextWithViolations.canProcessingContinue()) { + + // Step 3: Create Alert Message + final String tcaAppName = getContext().getApplicationSpecification().getName(); + alertMessage = + TCAUtils.createTCAAlertString(processorContextWithViolations, tcaAppName, enableAlertCEFFormat); + calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT; + LOG.debug("VES Threshold Violation Detected. An alert message is be generated. {}", alertMessage); + + metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1); + + // Step 4: Emit message to Alert Sink Flowlet + tcaAlertOutputEmitter.emit(alertMessage); + + } else { + + calculatorMessageType = TCACalculatorMessageType.COMPLIANT; + metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1); + } + + } else { + + metrics.count(CDAPMetricsConstants.TCA_VES_INAPPLICABLE_MESSAGES_METRIC, 1); + } + + // save message to message status table + final int instanceId = getContext().getInstanceId(); + persist(processorContext, instanceId, calculatorMessageType, vesMessageStatusTable, alertMessage); + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java new file mode 100644 index 0000000..78ef877 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppConfig.java @@ -0,0 +1,96 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.settings; + +import com.google.common.base.Objects; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPBaseAppConfig; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; + + +/** + * Contains CDAP App Config Settings for TCA Application + * + * @author Rajiv Singla . Creation Date: 11/2/2016. + */ +public class TCAAppConfig extends CDAPBaseAppConfig { + + + private static final long serialVersionUID = 1L; + + protected String tcaSubscriberOutputStreamName; + protected Integer thresholdCalculatorFlowletInstances; + + protected String tcaVESMessageStatusTableName; + protected Integer tcaVESMessageStatusTableTTLSeconds; + protected String tcaVESAlertsTableName; + protected Integer tcaVESAlertsTableTTLSeconds; + + + public TCAAppConfig() { + appName = CDAPComponentsConstants.TCA_DEFAULT_NAME_APP; + appDescription = CDAPComponentsConstants.TCA_DEFAULT_DESCRIPTION_APP; + tcaSubscriberOutputStreamName = CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM; + thresholdCalculatorFlowletInstances = AnalyticsConstants.TCA_DEFAULT_THRESHOLD_CALCULATOR_FLOWLET_INSTANCES; + tcaVESMessageStatusTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_NAME_TABLE; + tcaVESMessageStatusTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_MESSAGE_STATUS_TTL_TABLE; + tcaVESAlertsTableName = CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_NAME_TABLE; + tcaVESAlertsTableTTLSeconds = AnalyticsConstants.TCA_DEFAULT_VES_ALERTS_TTL_TABLE; + } + + public String getTcaSubscriberOutputStreamName() { + return tcaSubscriberOutputStreamName; + } + + public String getTcaVESMessageStatusTableName() { + return tcaVESMessageStatusTableName; + } + + public Integer getTcaVESMessageStatusTableTTLSeconds() { + return tcaVESMessageStatusTableTTLSeconds; + } + + public String getTcaVESAlertsTableName() { + return tcaVESAlertsTableName; + } + + public Integer getTcaVESAlertsTableTTLSeconds() { + return tcaVESAlertsTableTTLSeconds; + } + + public Integer getThresholdCalculatorFlowletInstances() { + return thresholdCalculatorFlowletInstances; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appName", appName) + .add("appDescription", appDescription) + .add("tcaSubscriberOutputStreamName", tcaSubscriberOutputStreamName) + .add("thresholdCalculatorFlowletInstances", thresholdCalculatorFlowletInstances) + .add("tcaVESMessageStatusTableName", tcaVESMessageStatusTableName) + .add("tcaVESMessageStatusTableTTLSeconds", tcaVESMessageStatusTableTTLSeconds) + .add("tcaVESAlertsTableName", tcaVESAlertsTableName) + .add("tcaVESAlertsTableTTLSeconds", tcaVESAlertsTableTTLSeconds) + .toString(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java new file mode 100644 index 0000000..c29b7ce --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java @@ -0,0 +1,223 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.settings; + +import com.google.common.base.Objects; +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; + +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS; + +/** + * <p> + * App Preferences for Analytics TCA (Threshold Crossing Alert) App + * <p> + * @author Rajiv Singla . Creation Date: 10/4/2016. + */ +public class TCAAppPreferences implements CDAPAppPreferences { + + private static final long serialVersionUID = 1L; + + // subscriber preferences + protected String subscriberHostName; + + protected Integer subscriberHostPort; + + protected String subscriberTopicName; + + protected String subscriberProtocol; + + protected String subscriberUserName; + + protected String subscriberUserPassword; + + protected String subscriberContentType; + + protected String subscriberConsumerId; + + protected String subscriberConsumerGroup; + + protected Integer subscriberTimeoutMS; + + protected Integer subscriberMessageLimit; + + protected Integer subscriberPollingInterval; + + // publisher preferences + protected String publisherHostName; + + protected Integer publisherHostPort; + + protected String publisherTopicName; + + protected String publisherProtocol; + + protected String publisherUserName; + + protected String publisherUserPassword; + + protected String publisherContentType; + + protected Integer publisherMaxBatchSize; + + protected Integer publisherMaxRecoveryQueueSize; + + protected Integer publisherPollingInterval; + + protected Boolean enableAlertCEFFormat; + + /** + * Default constructor to setup default values for TCA App Preferences + */ + public TCAAppPreferences() { + + // subscriber defaults + subscriberPollingInterval = TCA_DEFAULT_SUBSCRIBER_POLLING_INTERVAL_MS; + + // publisher defaults + publisherMaxBatchSize = TCA_DEFAULT_PUBLISHER_MAX_BATCH_QUEUE_SIZE; + publisherMaxRecoveryQueueSize = TCA_DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE; + publisherPollingInterval = TCA_DEFAULT_PUBLISHER_POLLING_INTERVAL_MS; + + enableAlertCEFFormat = false; + + } + + public String getSubscriberHostName() { + return subscriberHostName; + } + + public Integer getSubscriberHostPort() { + return subscriberHostPort; + } + + public String getSubscriberTopicName() { + return subscriberTopicName; + } + + public String getSubscriberProtocol() { + return subscriberProtocol; + } + + public String getSubscriberUserName() { + return subscriberUserName; + } + + public String getSubscriberUserPassword() { + return subscriberUserPassword; + } + + public String getSubscriberContentType() { + return subscriberContentType; + } + + public String getSubscriberConsumerId() { + return subscriberConsumerId; + } + + public String getSubscriberConsumerGroup() { + return subscriberConsumerGroup; + } + + public Integer getSubscriberTimeoutMS() { + return subscriberTimeoutMS; + } + + public Integer getSubscriberMessageLimit() { + return subscriberMessageLimit; + } + + public Integer getSubscriberPollingInterval() { + return subscriberPollingInterval; + } + + public String getPublisherHostName() { + return publisherHostName; + } + + public Integer getPublisherHostPort() { + return publisherHostPort; + } + + public String getPublisherTopicName() { + return publisherTopicName; + } + + public String getPublisherProtocol() { + return publisherProtocol; + } + + public String getPublisherUserName() { + return publisherUserName; + } + + public String getPublisherUserPassword() { + return publisherUserPassword; + } + + public String getPublisherContentType() { + return publisherContentType; + } + + public Integer getPublisherMaxBatchSize() { + return publisherMaxBatchSize; + } + + public Integer getPublisherMaxRecoveryQueueSize() { + return publisherMaxRecoveryQueueSize; + } + + public Integer getPublisherPollingInterval() { + return publisherPollingInterval; + } + + public Boolean getEnableAlertCEFFormat() { + return enableAlertCEFFormat; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("subscriberHostName", subscriberHostName) + .add("subscriberHostPort", subscriberHostPort) + .add("subscriberTopicName", subscriberTopicName) + .add("subscriberProtocol", subscriberProtocol) + .add("subscriberUserName", subscriberUserName) + .add("subscriberContentType", subscriberContentType) + .add("subscriberConsumerId", subscriberConsumerId) + .add("subscriberConsumerGroup", subscriberConsumerGroup) + .add("subscriberTimeoutMS", subscriberTimeoutMS) + .add("subscriberMessageLimit", subscriberMessageLimit) + .add("subscriberPollingInterval", subscriberPollingInterval) + .add("publisherHostName", publisherHostName) + .add("publisherHostPort", publisherHostPort) + .add("publisherTopicName", publisherTopicName) + .add("publisherProtocol", publisherProtocol) + .add("publisherUserName", publisherUserName) + .add("publisherContentType", publisherContentType) + .add("publisherMaxBatchSize", publisherMaxBatchSize) + .add("publisherMaxRecoveryQueueSize", publisherMaxRecoveryQueueSize) + .add("publisherPollingInterval", publisherPollingInterval) + .toString(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java new file mode 100644 index 0000000..9e27f22 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/settings/TCAPolicyPreferences.java @@ -0,0 +1,36 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.settings; + +import org.openecomp.dcae.apod.analytics.cdap.common.settings.CDAPAppPreferences; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; + +/** + * A wrapper over {@link TCAPolicy} to act as app Preferences as TCA Policy is passed + * by controller as runtime arguments from CDAP app preferences + * <p> + * @author Rajiv Singla . Creation Date: 11/29/2016. + */ +public class TCAPolicyPreferences extends TCAPolicy implements CDAPAppPreferences { + + private static final long serialVersionUID = 1L; + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java new file mode 100644 index 0000000..808b8ca --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToPublisherConfigMapper.java @@ -0,0 +1,97 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.utils; + +import com.google.common.base.Function; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + + +/** + * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} + * <p> + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +public class AppPreferencesToPublisherConfigMapper implements Function<TCAAppPreferences, DMaaPMRPublisherConfig> { + + /** + * Factory method to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object + * + * @param tcaAppPreferences tca App Preferences + * + * @return publisher config object + */ + public static DMaaPMRPublisherConfig map(final TCAAppPreferences tcaAppPreferences) { + return new AppPreferencesToPublisherConfigMapper().apply(tcaAppPreferences); + } + + /** + * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRPublisherConfig} object + * + * @param tcaAppPreferences tca App Preferences + * + * @return publisher config object + */ + @Nonnull + @Override + public DMaaPMRPublisherConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { + + // Create a new publisher settings builder + final DMaaPMRPublisherConfig.Builder publisherConfigBuilder = new DMaaPMRPublisherConfig.Builder( + tcaAppPreferences.getPublisherHostName(), tcaAppPreferences.getPublisherTopicName()); + + // Setup up any optional publisher parameters if they are present + final Integer publisherHostPort = tcaAppPreferences.getPublisherHostPort(); + if (publisherHostPort != null) { + publisherConfigBuilder.setPortNumber(publisherHostPort); + } + final String publisherProtocol = tcaAppPreferences.getPublisherProtocol(); + if (isPresent(publisherProtocol)) { + publisherConfigBuilder.setProtocol(publisherProtocol); + } + final String publisherUserName = tcaAppPreferences.getPublisherUserName(); + if (isPresent(publisherUserName)) { + publisherConfigBuilder.setUserName(publisherUserName); + } + final String publisherUserPassword = tcaAppPreferences.getPublisherUserPassword(); + if (isPresent(publisherUserPassword)) { + publisherConfigBuilder.setUserPassword(publisherUserPassword); + } + final String publisherContentType = tcaAppPreferences.getPublisherContentType(); + if (isPresent(publisherContentType)) { + publisherConfigBuilder.setContentType(publisherContentType); + } + final Integer publisherMaxBatchSize = tcaAppPreferences.getPublisherMaxBatchSize(); + if (publisherMaxBatchSize != null) { + publisherConfigBuilder.setMaxBatchSize(publisherMaxBatchSize); + } + final Integer publisherMaxRecoveryQueueSize = tcaAppPreferences.getPublisherMaxRecoveryQueueSize(); + if (publisherMaxRecoveryQueueSize != null) { + publisherConfigBuilder.setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize); + } + + return publisherConfigBuilder.build(); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java new file mode 100644 index 0000000..e017b81 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/AppPreferencesToSubscriberConfigMapper.java @@ -0,0 +1,113 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.utils; + +import com.google.common.base.Function; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; + +import javax.annotation.Nonnull; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent; + + +/** + * Function which translates {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} + * + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +public class AppPreferencesToSubscriberConfigMapper implements Function<TCAAppPreferences, DMaaPMRSubscriberConfig> { + + /** + * Factory Method to converts {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object + * + * @param tcaAppPreferences tca app preferences + * @return DMaaP Subscriber Config + */ + public static DMaaPMRSubscriberConfig map(final TCAAppPreferences tcaAppPreferences) { + return new AppPreferencesToSubscriberConfigMapper().apply(tcaAppPreferences); + } + + /** + * Implementation to convert {@link TCAAppPreferences} to {@link DMaaPMRSubscriberConfig} object + * + * @param tcaAppPreferences tca app preferences + * + * @return DMaaP Subscriber Config + */ + @Nonnull + @Override + public DMaaPMRSubscriberConfig apply(@Nonnull TCAAppPreferences tcaAppPreferences) { + + // Create a new subscriber settings builder + final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder( + tcaAppPreferences.getSubscriberHostName(), tcaAppPreferences.getSubscriberTopicName()); + + // Setup up any optional subscriber parameters if they are present + final Integer subscriberHostPortNumber = tcaAppPreferences.getSubscriberHostPort(); + if (subscriberHostPortNumber != null) { + subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber); + } + + final String subscriberProtocol = tcaAppPreferences.getSubscriberProtocol(); + if (isPresent(subscriberProtocol)) { + subscriberConfigBuilder.setProtocol(subscriberProtocol); + } + + final String subscriberUserName = tcaAppPreferences.getSubscriberUserName(); + if (isPresent(subscriberUserName)) { + subscriberConfigBuilder.setUserName(subscriberUserName); + } + + final String subscriberUserPassword = tcaAppPreferences.getSubscriberUserPassword(); + if (isPresent(subscriberUserPassword)) { + subscriberConfigBuilder.setUserPassword(subscriberUserPassword); + } + + final String subscriberContentType = tcaAppPreferences.getSubscriberContentType(); + if (isPresent(subscriberContentType)) { + subscriberConfigBuilder.setContentType(subscriberContentType); + } + + final String subscriberConsumerId = tcaAppPreferences.getSubscriberConsumerId(); + if (isPresent(subscriberConsumerId)) { + subscriberConfigBuilder.setConsumerId(subscriberConsumerId); + } + + final String subscriberConsumerGroup = tcaAppPreferences.getSubscriberConsumerGroup(); + if (isPresent(subscriberConsumerGroup)) { + subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup); + } + + final Integer subscriberTimeoutMS = tcaAppPreferences.getSubscriberTimeoutMS(); + if (subscriberTimeoutMS != null) { + subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS); + } + final Integer subscriberMessageLimit = tcaAppPreferences.getSubscriberMessageLimit(); + if (subscriberMessageLimit != null) { + subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit); + } + + // return Subscriber settings + return subscriberConfigBuilder.build(); + + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java new file mode 100644 index 0000000..29d42d5 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/utils/CDAPTCAUtils.java @@ -0,0 +1,171 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.utils; + +import co.cask.cdap.api.RuntimeContext; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator; +import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static com.google.common.collect.Lists.newArrayList; +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH; + +/** + * Utility Helper methods for CDAP TCA sub module. + * + * <p> + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +public abstract class CDAPTCAUtils extends TCAUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class); + + /** + * Function that extracts alert message string from {@link TCAVESAlertEntity} + */ + public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION = + new Function<TCAVESAlertEntity, String>() { + @Override + public String apply(TCAVESAlertEntity alertEntity) { + return alertEntity == null ? null : alertEntity.getAlertMessage(); + } + }; + + + /** + * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object + * + * @param runtimeContext Runtime Context + * + * @return validated runtime arguments as {@link TCAAppPreferences} object + */ + public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) { + // Parse runtime arguments + final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); + final TCAAppPreferences tcaAppPreferences = + ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class); + + // Validate runtime arguments + validateSettings(tcaAppPreferences, new TCAPreferencesValidator()); + + return tcaAppPreferences; + } + + + /** + * Extracts alert message strings from {@link TCAVESAlertEntity} + * + * @param alertEntities collection of alert entities + * + * @return List of alert message strings + */ + public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) { + return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION); + } + + + /** + * Converts Runtime Arguments to {@link TCAPolicyPreferences} object + * + * @param runtimeContext CDAP Runtime Arguments + * + * @return TCA Policy Preferences + */ + public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) { + + final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments(); + final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments); + + LOG.debug("Printing all Received Runtime Arguments:"); + for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) { + LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue()); + } + + TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences(); + + final String tcaPolicy = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY); + + if (tcaPolicy != null) { + + LOG.debug(" tcaPolicy is being read from JSON String"); + + // initialize unquotedTCAPolicy + String unquotedTCAPolicy = tcaPolicy; + + //remove starting and ending quote from tcaPolicy + if (tcaPolicy.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && tcaPolicy.trim().endsWith + (AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) { + unquotedTCAPolicy = tcaPolicy.trim().substring(1, tcaPolicy.trim().length() - 1); + } + + try { + tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class); + } catch (IOException e) { + throw new CDAPSettingsException("Invalid tca policy format", LOG, e); + } + + } else { // old controller is being used. Validate preferences as received from old controller + + // extract TCA Policy Domain from Runtime Arguments + final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH); + + // create new TCA Policy object + tcaPolicyPreferences.setDomain(policyDomain); + + // filter out other non relevant fields which are not related to tca policy + final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments, + TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH); + + // determine functional Roles + final Map<String, Map<String, String>> functionalRolesMap = + extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); + + // create metrics per functional role list + tcaPolicyPreferences.setMetricsPerFunctionalRole( + createTCAPolicyMetricsPerFunctionalRoleList(functionalRolesMap)); + + } + + // validate tca Policy Preferences + validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator()); + + LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences); + + return tcaPolicyPreferences; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java new file mode 100644 index 0000000..23e4c8a --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAAppConfigValidator.java @@ -0,0 +1,62 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.validator; + +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * <p> + * TCA App Config Validator validates any TCA App Config parameter values + * </p> + * + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +public class TCAAppConfigValidator implements CDAPAppSettingsValidator<TCAAppConfig, + GenericValidationResponse<TCAAppConfig>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAAppConfig> validateAppSettings(TCAAppConfig tcaAppConfig) { + + final GenericValidationResponse<TCAAppConfig> validationResponse = new GenericValidationResponse<>(); + + if (isEmpty(tcaAppConfig.getTcaSubscriberOutputStreamName())) { + validationResponse.addErrorMessage("tcaSubscriberOutputStreamName", + "tcaSubscriberOutputStreamName must be present"); + } + + if (isEmpty(tcaAppConfig.getTcaVESMessageStatusTableName())) { + validationResponse.addErrorMessage("tcaVESMessageStatusTableName", + "tcaVESMessageStatusTableName must be present"); + } + if (isEmpty(tcaAppConfig.getTcaVESAlertsTableName())) { + validationResponse.addErrorMessage("tcaVESAlertsTableName", + "tcaVESAlertsTableName must be present"); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java new file mode 100644 index 0000000..858204a --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPolicyPreferencesValidator.java @@ -0,0 +1,96 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.validator; + +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; + +import java.util.List; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * Validates TCA Policy Preferences + * <p> + * @author Rajiv Singla . Creation Date: 11/29/2016. + */ +public class TCAPolicyPreferencesValidator implements CDAPAppSettingsValidator<TCAPolicyPreferences, + GenericValidationResponse<TCAPolicyPreferences>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAPolicyPreferences> validateAppSettings( + final TCAPolicyPreferences tcaPolicyPreferences) { + + final GenericValidationResponse<TCAPolicyPreferences> validationResponse = new GenericValidationResponse<>(); + + // validate TCA Policy must domain present + final String domain = tcaPolicyPreferences.getDomain(); + if (isEmpty(domain)) { + validationResponse.addErrorMessage("domain", "TCA Policy must have only one domain present"); + } + + // validate TCA Policy must have at least one functional role + final List<String> policyFunctionalRoles = TCAUtils.getPolicyFunctionalRoles(tcaPolicyPreferences); + if (policyFunctionalRoles.isEmpty()) { + validationResponse.addErrorMessage("metricsPerFunctionalRoles", + "TCA Policy must have at least one or more functional roles"); + } + + final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles = + tcaPolicyPreferences.getMetricsPerFunctionalRole(); + + // validate each Functional Role must have at least one threshold + for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) { + if (metricsPerFunctionalRole.getThresholds().isEmpty()) { + validationResponse.addErrorMessage("thresholds", + "TCA Policy Functional Role must have at least one threshold. " + + "Functional Role causing this validation error:" + metricsPerFunctionalRole); + } + } + + // validate each threshold must have non null - fieldPath, thresholdValue, direction and severity + for (MetricsPerFunctionalRole metricsPerFunctionalRole : metricsPerFunctionalRoles) { + final List<Threshold> functionalRoleThresholds = metricsPerFunctionalRole.getThresholds(); + for (Threshold functionalRoleThreshold : functionalRoleThresholds) { + final String fieldPath = functionalRoleThreshold.getFieldPath(); + final Long thresholdValue = functionalRoleThreshold.getThresholdValue(); + final Direction direction = functionalRoleThreshold.getDirection(); + final EventSeverity severity = functionalRoleThreshold.getSeverity(); + if (isEmpty(fieldPath) || thresholdValue == null || direction == null || severity == null) { + validationResponse.addErrorMessage("threshold", + "TCA Policy threshold must have fieldPath, thresholdValue, direction and severity present." + + "Threshold causing this validation error:" + functionalRoleThreshold); + } + } + } + + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java new file mode 100644 index 0000000..c74463b --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java @@ -0,0 +1,65 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.validator; + +import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.common.validation.GenericValidationResponse; + +import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty; + +/** + * + * @author Rajiv Singla . Creation Date: 11/3/2016. + */ +public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppPreferences, + GenericValidationResponse<TCAAppPreferences>> { + + private static final long serialVersionUID = 1L; + + @Override + public GenericValidationResponse<TCAAppPreferences> validateAppSettings(TCAAppPreferences appPreferences) { + + final GenericValidationResponse<TCAAppPreferences> validationResponse = new GenericValidationResponse<>(); + + // subscriber validations + final String subscriberHostName = appPreferences.getSubscriberHostName(); + if (isEmpty(subscriberHostName)) { + validationResponse.addErrorMessage("subscriberHostName", "Subscriber host name must be present"); + } + final String subscriberTopicName = appPreferences.getSubscriberTopicName(); + if (isEmpty(subscriberTopicName)) { + validationResponse.addErrorMessage("subscriberTopicName", "Subscriber topic name must be present"); + } + + // publisher validations + final String publisherHostName = appPreferences.getPublisherHostName(); + if (isEmpty(publisherHostName)) { + validationResponse.addErrorMessage("publisherHostName", "Publisher host name must be present"); + } + final String publisherTopicName = appPreferences.getPublisherTopicName(); + if (isEmpty(publisherTopicName)) { + validationResponse.addErrorMessage("publisherTopicName", "Publisher topic name must be present"); + } + + return validationResponse; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java new file mode 100644 index 0000000..6623321 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java @@ -0,0 +1,116 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.worker.AbstractWorker; +import com.google.common.base.Preconditions; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.String.format; + +/** + * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals + * <p> + * @author Rajiv Singla . Creation Date: 12/19/2016. + */ +public abstract class BaseTCADMaaPMRWorker extends AbstractWorker { + + private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class); + + /** + * Quartz Scheduler + */ + protected Scheduler scheduler; + /** + * Determines if scheduler is shutdown + */ + protected AtomicBoolean isSchedulerShutdown; + + + @Override + public void run() { + + Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); + String schedulerName = ""; + + // Start scheduler + try { + schedulerName = scheduler.getSchedulerName(); + scheduler.start(); + isSchedulerShutdown.getAndSet(false); + + } catch (SchedulerException e) { + final String errorMessage = + format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName); + + // indefinite loop which wakes up and confirms scheduler is indeed running + while (!isSchedulerShutdown.get()) { + try { + + Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS); + + } catch (InterruptedException e) { + + final String errorMessage = + format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s", + schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName); + + } + + @Override + public void stop() { + + Preconditions.checkNotNull(scheduler, "Scheduler must not be null"); + String schedulerName = ""; + + // Stop Scheduler + try { + schedulerName = scheduler.getSchedulerName(); + LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName); + scheduler.shutdown(); + isSchedulerShutdown.getAndSet(true); + + } catch (SchedulerException e) { + + final String errorMessage = + format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java new file mode 100644 index 0000000..ce4ccbe --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java @@ -0,0 +1,200 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.TxRunnable; +import co.cask.cdap.api.common.Bytes; +import co.cask.cdap.api.data.DatasetContext; +import co.cask.cdap.api.dataset.lib.CloseableIterator; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import com.google.common.base.Joiner; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.tephra.TransactionFailureException; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils; +import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME; +import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME; + +/** + * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to + * DMaaP MR topic + *<p> + * @author Rajiv Singla . Creation Date: 11/17/2016. + */ +@DisallowConcurrentExecution +@PersistJobDataAfterExecution +@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") +public class TCADMaaPMRPublisherJob implements Job { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class); + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}", + jobExecutionContext.getNextFireTime()); + + // Get Job Data Map + final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); + + // Fetch all Job Params from Job Data Map + final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME); + final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME); + final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME); + final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME); + + LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName); + + // Get new alerts from alerts table + final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext); + + // If no new alerts are found - nothing to publish + if (newAlertsMap.isEmpty()) { + LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1); + return; + } + + final int newAlertsCount = newAlertsMap.size(); + LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName, + newAlertsCount); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount); + + // Get alert message strings from alert Entities + final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values()); + + // Publish messages to DMaaP MR Topic + try { + + final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages); + + final Integer responseCode = publisherResponse.getResponseCode(); + final String responseMessage = publisherResponse.getResponseMessage(); + final int pendingMessagesCount = publisherResponse.getPendingMessagesCount(); + + LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode, + responseMessage, pendingMessagesCount); + + if (HTTPUtils.isSuccessfulResponseCode(responseCode)) { + LOG.debug("Successfully Published alerts to DMaaP MR Topic."); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); + } else { + LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later...."); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1); + } + + } catch (DCAEAnalyticsRuntimeException e) { + LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e); + } finally { + // delete send message from alerts table + deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics); + } + + LOG.debug("Finished DMaaP MR Topic Publisher fetch Job."); + + } + + /** + * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values + * + * @param cdapAlertsTableName alerts table name + * @param workerContext worker context + * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values + */ + protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName, + final WorkerContext workerContext) { + final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>(); + try { + workerContext.execute(new TxRunnable() { + @Override + public void run(DatasetContext context) throws Exception { + final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); + final Date currentTime = new Date(); + final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime); + final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey); + while (scan.hasNext()) { + final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next(); + newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue()); + } + } + }); + } catch (TransactionFailureException e) { + final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + return newAlertsMap; + } + + /** + * Deletes rows in Alerts table for give rowKeys + * + * @param cdapAlertsTableName CDAP Alerts Table Name + * @param workerContext Worker Context + * @param rowKeys Row Key Set + * @param metrics CDAP metrics + */ + protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext, + final Set<String> rowKeys, final Metrics metrics) { + LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys)); + try { + workerContext.execute(new TxRunnable() { + @Override + public void run(DatasetContext context) throws Exception { + final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName); + for (String rowKey : rowKeys) { + alertsTable.delete(rowKey); + metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1); + } + } + }); + } catch (TransactionFailureException e) { + final String errorMessage = + "Transaction Error while deleting published alerts in alerts table: " + e.toString(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java new file mode 100644 index 0000000..9fb9d83 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java @@ -0,0 +1,114 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import com.google.common.base.Optional; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants; +import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static java.lang.String.format; + +/** + * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to + * a given CDAP Stream + * + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +@DisallowConcurrentExecution +@PersistJobDataAfterExecution +public class TCADMaaPMRSubscriberJob implements Job { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class); + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}", + jobExecutionContext.getNextFireTime()); + + // Get Job Data Map + final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); + + // Fetch all Job Params from Job Data Map + final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME); + final WorkerContext workerContext = + (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME); + final DMaaPMRSubscriber subscriber = + (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME); + final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME); + + final Optional<List<String>> subscriberMessagesOptional = + DMaaPMRUtils.getSubscriberMessages(subscriber, metrics); + + // Write message to CDAP Stream using Stream Writer + if (subscriberMessagesOptional.isPresent()) { + writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics); + } + } + + + /** + * Writes given messages to CDAP Stream + * + * @param actualMessages List of messages that need to written to cdap stream + * @param cdapStreamName cdap stream name + * @param workerContext cdap worker context + * @param metrics cdap metrics + */ + private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName, + final WorkerContext workerContext, final Metrics metrics) { + LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size()); + try { + + for (String message : actualMessages) { + workerContext.write(cdapStreamName, message); + } + + } catch (IOException e) { + metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1); + final String errorMessage = + format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " + + "Exception: %s", cdapStreamName, e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}", + cdapStreamName, actualMessages.size()); + + } + +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java new file mode 100644 index 0000000..4d721e2 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java @@ -0,0 +1,141 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.worker.AbstractWorker; +import co.cask.cdap.api.worker.WorkerContext; +import com.fasterxml.jackson.core.type.TypeReference; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue; +import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString; + +/** + * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic. + * The mock instead of making DMaaP MR calls will actually take messages + * from file and send them to stream at subscriber polling interval + * + * TODO: To be removed before going to production - only for testing purposes + * + * @author Rajiv Singla . Creation Date: 11/4/2016. + */ +public class TCADMaaPMockSubscriberWorker extends AbstractWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class); + + // TODO: Remove this file before going to production - only for mocking purposes + private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json"; + private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE = + new TypeReference<List<EventListener>>() { + }; + + private TCAAppPreferences tcaAppPreferences; + private boolean stopSendingMessages; + @Property + private final String tcaSubscriberOutputStreamName; + + public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) { + this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; + } + + @Override + public void configure() { + setName("MockTCASubscriberWorker"); + setDescription("Writes Mocked VES messages to CDAP Stream"); + LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker"); + } + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences); + this.tcaAppPreferences = appPreferences; + this.stopSendingMessages = false; + } + + + @Override + public void run() { + final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); + LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval); + + final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream + (MOCK_MESSAGE_FILE_LOCATION); + + if (resourceAsStream == null) { + LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION); + throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException()); + } + + + try { + List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE); + + final int totalMessageCount = eventListeners.size(); + LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount); + + int i = 1; + for (EventListener eventListener : eventListeners) { + if (stopSendingMessages) { + LOG.debug("Stop sending messages......"); + break; + } + final String eventListenerString = writeValueAsString(eventListener); + LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount); + getContext().write(tcaSubscriberOutputStreamName, eventListenerString); + i++; + + try { + Thread.sleep(subscriberPollingInterval); + } catch (InterruptedException e) { + LOG.error("Error while sleeping"); + throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e); + } + } + + LOG.debug("Finished writing mock messages to CDAP Stream"); + + } catch (IOException e) { + LOG.error("Error while parsing json file"); + throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e); + } + + + } + + @Override + public void stop() { + stopSendingMessages = true; + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java new file mode 100644 index 0000000..90e458a --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java @@ -0,0 +1,146 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToPublisherConfigMapper; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.quartz.JobDataMap; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.String.format; + +/** + * TCA DMaaP Publisher will monitor alerts table at regular intervals and publish any alerts to DMaaP MR Publishing + * Topic + * <p> + * @author Rajiv Singla . Creation Date: 11/16/2016. + */ +public class TCADMaaPPublisherWorker extends BaseTCADMaaPMRWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPPublisherWorker.class); + + private DMaaPMRPublisher publisher; + private Metrics metrics; + @Property + private final String tcaVESAlertsTableName; + + public TCADMaaPPublisherWorker(final String tcaVESAlertsTableName) { + this.tcaVESAlertsTableName = tcaVESAlertsTableName; + } + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); + setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_DESCRIPTION_WORKER); + LOG.debug("Configuring TCA MR DMaaP Publisher worker with name: {}", + CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER); + } + + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + + LOG.info("Initializing TCA MR DMaaP Publisher worker with preferences: {}", tcaAppPreferences); + + // Map TCA App Preferences to DMaaP MR Publisher Config + final DMaaPMRPublisherConfig publisherConfig = AppPreferencesToPublisherConfigMapper.map(tcaAppPreferences); + + LOG.info("TCA DMaaP MR Publisher worker will be polling TCA Alerts Table Name: {}", tcaVESAlertsTableName); + + // Create an instance of DMaaP MR Publisher + LOG.debug("Creating an instance of DMaaP Publisher"); + publisher = DMaaPMRFactory.create().createPublisher(publisherConfig); + + // initialize a new Quartz scheduler + initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); + + // initialize scheduler state + isSchedulerShutdown = new AtomicBoolean(true); + } + + + /** + * Stop DMaaP Publisher + */ + @Override + public void stop() { + // Close Publisher - which will flush any batch messages if present in batch queue + if (publisher != null) { + try { + publisher.close(); + } catch (Exception e) { + final String errorMessage = format("Error while shutting down DMaaP MR Publisher: %s", e); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + // Shut down scheduler + super.stop(); + } + + + /** + * Initializes a scheduler instance for DMaaP MR Publisher Job + * + * @throws SchedulerException SchedulerException + */ + private void initializeScheduler(final TCAAppPreferences tcaAnalyticsAppConfig, + final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { + + // Get Publisher polling interval + final Integer publisherPollingInterval = tcaAnalyticsAppConfig.getPublisherPollingInterval(); + + // Publisher Quartz Properties file + final String quartzPublisherPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_PUBLISHER_PROPERTIES_FILE_NAME; + + // Create a new JobDataMap containing information required by TCA DMaaP Publisher Job + final JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME, tcaVESAlertsTableName); + jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); + jobDataMap.put(AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME, publisher); + jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); + + // Create new publisher scheduler + scheduler = TCAUtils.createQuartzScheduler(publisherPollingInterval, stdSchedulerFactory, + quartzPublisherPropertiesFileName, jobDataMap, TCADMaaPMRPublisherJob.class, + AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_JOB_NAME, + AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_TRIGGER_NAME); + } +} diff --git a/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java new file mode 100644 index 0000000..900d62e --- /dev/null +++ b/dcae-analytics-cdap-tca/src/main/java/org/openecomp/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java @@ -0,0 +1,124 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.tca.worker; + +import co.cask.cdap.api.annotation.Property; +import co.cask.cdap.api.metrics.Metrics; +import co.cask.cdap.api.worker.WorkerContext; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToSubscriberConfigMapper; +import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; +import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; +import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber; +import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; +import org.quartz.JobDataMap; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * TCA DMaaP Subscriber will read messages and post them to cdap stream at regular intervals + * <p> + * @author Rajiv Singla . Creation Date: 10/14/2016. + */ +public class TCADMaaPSubscriberWorker extends BaseTCADMaaPMRWorker { + + private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPSubscriberWorker.class); + + private DMaaPMRSubscriber subscriber; + private Metrics metrics; + @Property + private final String tcaSubscriberOutputStreamName; + + public TCADMaaPSubscriberWorker(final String tcaSubscriberOutputStreamName) { + this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName; + } + + + @Override + public void configure() { + setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); + setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_DESCRIPTION_WORKER); + LOG.debug("Configuring TCA MR DMaaP Subscriber worker with name: {}", + CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER); + } + + @Override + public void initialize(WorkerContext context) throws Exception { + super.initialize(context); + + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context); + + LOG.info("Initializing TCA MR DMaaP Subscriber worker with preferences: {}", tcaAppPreferences); + + // Map TCA App Preferences to DMaaP MR Subscriber Config + final DMaaPMRSubscriberConfig subscriberConfig = AppPreferencesToSubscriberConfigMapper.map(tcaAppPreferences); + + LOG.info("TCA DMaaP MR Subscriber worker will be writing to CDAP Stream: {}", tcaSubscriberOutputStreamName); + + // Create an instance of DMaaP MR Subscriber + LOG.debug("Creating an instance of DMaaP Subscriber"); + subscriber = DMaaPMRFactory.create().createSubscriber(subscriberConfig); + + // initialize a new Quartz scheduler + initializeScheduler(tcaAppPreferences, new StdSchedulerFactory()); + + // initialize scheduler state + isSchedulerShutdown = new AtomicBoolean(true); + } + + /** + * Initializes a scheduler instance for DMaaP MR Subscriber Job + * + * @throws SchedulerException SchedulerException + */ + private void initializeScheduler(final TCAAppPreferences tcaAppPreferences, + final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException { + + // Get Subscriber polling interval + final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval(); + + // Subscriber Quartz Properties file + final String quartzSubscriberPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_SUBSCRIBER_PROPERTIES_FILE_NAME; + + // Create a new JobDataMap containing information required by TCA DMaaP Subscriber Job + final JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME, tcaSubscriberOutputStreamName); + jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext()); + jobDataMap.put(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME, subscriber); + jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics); + + // Create new publisher scheduler + scheduler = TCAUtils.createQuartzScheduler(subscriberPollingInterval, stdSchedulerFactory, + quartzSubscriberPropertiesFileName, jobDataMap, TCADMaaPMRSubscriberJob.class, + AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_JOB_NAME, + AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_TRIGGER_NAME); + } + + +} |