aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker')
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java116
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java200
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java114
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java141
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java146
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java124
6 files changed, 841 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java
new file mode 100644
index 0000000..f9deac8
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/BaseTCADMaaPMRWorker.java
@@ -0,0 +1,116 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.worker.AbstractWorker;
+import com.google.common.base.Preconditions;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+
+/**
+ * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals
+ * <p>
+ * @author Rajiv Singla . Creation Date: 12/19/2016.
+ */
+public abstract class BaseTCADMaaPMRWorker extends AbstractWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class);
+
+ /**
+ * Quartz Scheduler
+ */
+ protected Scheduler scheduler;
+ /**
+ * Determines if scheduler is shutdown
+ */
+ protected AtomicBoolean isSchedulerShutdown;
+
+
+ @Override
+ public void run() {
+
+ Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
+ String schedulerName = "";
+
+ // Start scheduler
+ try {
+ schedulerName = scheduler.getSchedulerName();
+ scheduler.start();
+ isSchedulerShutdown.getAndSet(false);
+
+ } catch (SchedulerException e) {
+ final String errorMessage =
+ format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName);
+
+ // indefinite loop which wakes up and confirms scheduler is indeed running
+ while (!isSchedulerShutdown.get()) {
+ try {
+
+ Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS);
+
+ } catch (InterruptedException e) {
+
+ final String errorMessage =
+ format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s",
+ schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName);
+
+ }
+
+ @Override
+ public void stop() {
+
+ Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
+ String schedulerName = "";
+
+ // Stop Scheduler
+ try {
+ schedulerName = scheduler.getSchedulerName();
+ LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName);
+ scheduler.shutdown();
+ isSchedulerShutdown.getAndSet(true);
+
+ } catch (SchedulerException e) {
+
+ final String errorMessage =
+ format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java
new file mode 100644
index 0000000..2114c8c
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRPublisherJob.java
@@ -0,0 +1,200 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.TxRunnable;
+import co.cask.cdap.api.common.Bytes;
+import co.cask.cdap.api.data.DatasetContext;
+import co.cask.cdap.api.dataset.lib.CloseableIterator;
+import co.cask.cdap.api.dataset.lib.KeyValue;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.google.common.base.Joiner;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.tephra.TransactionFailureException;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME;
+import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME;
+import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME;
+import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME;
+
+/**
+ * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to
+ * DMaaP MR topic
+ *<p>
+ * @author Rajiv Singla . Creation Date: 11/17/2016.
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
+public class TCADMaaPMRPublisherJob implements Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class);
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}",
+ jobExecutionContext.getNextFireTime());
+
+ // Get Job Data Map
+ final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
+
+ // Fetch all Job Params from Job Data Map
+ final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME);
+ final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME);
+ final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME);
+ final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME);
+
+ LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName);
+
+ // Get new alerts from alerts table
+ final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext);
+
+ // If no new alerts are found - nothing to publish
+ if (newAlertsMap.isEmpty()) {
+ LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1);
+ return;
+ }
+
+ final int newAlertsCount = newAlertsMap.size();
+ LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName,
+ newAlertsCount);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount);
+
+ // Get alert message strings from alert Entities
+ final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values());
+
+ // Publish messages to DMaaP MR Topic
+ try {
+
+ final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages);
+
+ final Integer responseCode = publisherResponse.getResponseCode();
+ final String responseMessage = publisherResponse.getResponseMessage();
+ final int pendingMessagesCount = publisherResponse.getPendingMessagesCount();
+
+ LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode,
+ responseMessage, pendingMessagesCount);
+
+ if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("Successfully Published alerts to DMaaP MR Topic.");
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
+ } else {
+ LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later....");
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
+ }
+
+ } catch (DCAEAnalyticsRuntimeException e) {
+ LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e);
+ } finally {
+ // delete send message from alerts table
+ deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics);
+ }
+
+ LOG.debug("Finished DMaaP MR Topic Publisher fetch Job.");
+
+ }
+
+ /**
+ * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values
+ *
+ * @param cdapAlertsTableName alerts table name
+ * @param workerContext worker context
+ * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values
+ */
+ protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName,
+ final WorkerContext workerContext) {
+ final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>();
+ try {
+ workerContext.execute(new TxRunnable() {
+ @Override
+ public void run(DatasetContext context) throws Exception {
+ final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
+ final Date currentTime = new Date();
+ final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime);
+ final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey);
+ while (scan.hasNext()) {
+ final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next();
+ newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue());
+ }
+ }
+ });
+ } catch (TransactionFailureException e) {
+ final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ return newAlertsMap;
+ }
+
+ /**
+ * Deletes rows in Alerts table for give rowKeys
+ *
+ * @param cdapAlertsTableName CDAP Alerts Table Name
+ * @param workerContext Worker Context
+ * @param rowKeys Row Key Set
+ * @param metrics CDAP metrics
+ */
+ protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext,
+ final Set<String> rowKeys, final Metrics metrics) {
+ LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys));
+ try {
+ workerContext.execute(new TxRunnable() {
+ @Override
+ public void run(DatasetContext context) throws Exception {
+ final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
+ for (String rowKey : rowKeys) {
+ alertsTable.delete(rowKey);
+ metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1);
+ }
+ }
+ });
+ } catch (TransactionFailureException e) {
+ final String errorMessage =
+ "Transaction Error while deleting published alerts in alerts table: " + e.toString();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java
new file mode 100644
index 0000000..1714d65
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMRSubscriberJob.java
@@ -0,0 +1,114 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.google.common.base.Optional;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.onap.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to
+ * a given CDAP Stream
+ *
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+public class TCADMaaPMRSubscriberJob implements Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class);
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}",
+ jobExecutionContext.getNextFireTime());
+
+ // Get Job Data Map
+ final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
+
+ // Fetch all Job Params from Job Data Map
+ final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME);
+ final WorkerContext workerContext =
+ (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME);
+ final DMaaPMRSubscriber subscriber =
+ (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME);
+ final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME);
+
+ final Optional<List<String>> subscriberMessagesOptional =
+ DMaaPMRUtils.getSubscriberMessages(subscriber, metrics);
+
+ // Write message to CDAP Stream using Stream Writer
+ if (subscriberMessagesOptional.isPresent()) {
+ writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics);
+ }
+ }
+
+
+ /**
+ * Writes given messages to CDAP Stream
+ *
+ * @param actualMessages List of messages that need to written to cdap stream
+ * @param cdapStreamName cdap stream name
+ * @param workerContext cdap worker context
+ * @param metrics cdap metrics
+ */
+ private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName,
+ final WorkerContext workerContext, final Metrics metrics) {
+ LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size());
+ try {
+
+ for (String message : actualMessages) {
+ workerContext.write(cdapStreamName, message);
+ }
+
+ } catch (IOException e) {
+ metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1);
+ final String errorMessage =
+ format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " +
+ "Exception: %s", cdapStreamName, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}",
+ cdapStreamName, actualMessages.size());
+
+ }
+
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java
new file mode 100644
index 0000000..e8130f3
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPMockSubscriberWorker.java
@@ -0,0 +1,141 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.worker.AbstractWorker;
+import co.cask.cdap.api.worker.WorkerContext;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.onap.dcae.apod.analytics.tca.utils.TCAUtils.readValue;
+import static org.onap.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString;
+
+/**
+ * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic.
+ * The mock instead of making DMaaP MR calls will actually take messages
+ * from file and send them to stream at subscriber polling interval
+ *
+ * TODO: To be removed before going to production - only for testing purposes
+ *
+ * @author Rajiv Singla . Creation Date: 11/4/2016.
+ */
+public class TCADMaaPMockSubscriberWorker extends AbstractWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class);
+
+ // TODO: Remove this file before going to production - only for mocking purposes
+ private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
+ private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
+ new TypeReference<List<EventListener>>() {
+ };
+
+ private TCAAppPreferences tcaAppPreferences;
+ private boolean stopSendingMessages;
+ @Property
+ private final String tcaSubscriberOutputStreamName;
+
+ public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) {
+ this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
+ }
+
+ @Override
+ public void configure() {
+ setName("MockTCASubscriberWorker");
+ setDescription("Writes Mocked VES messages to CDAP Stream");
+ LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker");
+ }
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+ LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences);
+ this.tcaAppPreferences = appPreferences;
+ this.stopSendingMessages = false;
+ }
+
+
+ @Override
+ public void run() {
+ final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
+ LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval);
+
+ final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream
+ (MOCK_MESSAGE_FILE_LOCATION);
+
+ if (resourceAsStream == null) {
+ LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
+ throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
+ }
+
+
+ try {
+ List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
+
+ final int totalMessageCount = eventListeners.size();
+ LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
+
+ int i = 1;
+ for (EventListener eventListener : eventListeners) {
+ if (stopSendingMessages) {
+ LOG.debug("Stop sending messages......");
+ break;
+ }
+ final String eventListenerString = writeValueAsString(eventListener);
+ LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
+ getContext().write(tcaSubscriberOutputStreamName, eventListenerString);
+ i++;
+
+ try {
+ Thread.sleep(subscriberPollingInterval);
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping");
+ throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
+ }
+ }
+
+ LOG.debug("Finished writing mock messages to CDAP Stream");
+
+ } catch (IOException e) {
+ LOG.error("Error while parsing json file");
+ throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
+ }
+
+
+ }
+
+ @Override
+ public void stop() {
+ stopSendingMessages = true;
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java
new file mode 100644
index 0000000..78dbd35
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPPublisherWorker.java
@@ -0,0 +1,146 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToPublisherConfigMapper;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
+import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.quartz.JobDataMap;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+
+/**
+ * TCA DMaaP Publisher will monitor alerts table at regular intervals and publish any alerts to DMaaP MR Publishing
+ * Topic
+ * <p>
+ * @author Rajiv Singla . Creation Date: 11/16/2016.
+ */
+public class TCADMaaPPublisherWorker extends BaseTCADMaaPMRWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPPublisherWorker.class);
+
+ private DMaaPMRPublisher publisher;
+ private Metrics metrics;
+ @Property
+ private final String tcaVESAlertsTableName;
+
+ public TCADMaaPPublisherWorker(final String tcaVESAlertsTableName) {
+ this.tcaVESAlertsTableName = tcaVESAlertsTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_DESCRIPTION_WORKER);
+ LOG.debug("Configuring TCA MR DMaaP Publisher worker with name: {}",
+ CDAPComponentsConstants.TCA_FIXED_DMAAP_PUBLISHER_WORKER);
+ }
+
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+
+ LOG.info("Initializing TCA MR DMaaP Publisher worker with preferences: {}", tcaAppPreferences);
+
+ // Map TCA App Preferences to DMaaP MR Publisher Config
+ final DMaaPMRPublisherConfig publisherConfig = AppPreferencesToPublisherConfigMapper.map(tcaAppPreferences);
+
+ LOG.info("TCA DMaaP MR Publisher worker will be polling TCA Alerts Table Name: {}", tcaVESAlertsTableName);
+
+ // Create an instance of DMaaP MR Publisher
+ LOG.debug("Creating an instance of DMaaP Publisher");
+ publisher = DMaaPMRFactory.create().createPublisher(publisherConfig);
+
+ // initialize a new Quartz scheduler
+ initializeScheduler(tcaAppPreferences, new StdSchedulerFactory());
+
+ // initialize scheduler state
+ isSchedulerShutdown = new AtomicBoolean(true);
+ }
+
+
+ /**
+ * Stop DMaaP Publisher
+ */
+ @Override
+ public void stop() {
+ // Close Publisher - which will flush any batch messages if present in batch queue
+ if (publisher != null) {
+ try {
+ publisher.close();
+ } catch (Exception e) {
+ final String errorMessage = format("Error while shutting down DMaaP MR Publisher: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+ // Shut down scheduler
+ super.stop();
+ }
+
+
+ /**
+ * Initializes a scheduler instance for DMaaP MR Publisher Job
+ *
+ * @throws SchedulerException SchedulerException
+ */
+ private void initializeScheduler(final TCAAppPreferences tcaAnalyticsAppConfig,
+ final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException {
+
+ // Get Publisher polling interval
+ final Integer publisherPollingInterval = tcaAnalyticsAppConfig.getPublisherPollingInterval();
+
+ // Publisher Quartz Properties file
+ final String quartzPublisherPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_PUBLISHER_PROPERTIES_FILE_NAME;
+
+ // Create a new JobDataMap containing information required by TCA DMaaP Publisher Job
+ final JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME, tcaVESAlertsTableName);
+ jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext());
+ jobDataMap.put(AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME, publisher);
+ jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics);
+
+ // Create new publisher scheduler
+ scheduler = TCAUtils.createQuartzScheduler(publisherPollingInterval, stdSchedulerFactory,
+ quartzPublisherPropertiesFileName, jobDataMap, TCADMaaPMRPublisherJob.class,
+ AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_JOB_NAME,
+ AnalyticsConstants.TCA_DMAAP_PUBLISHER_QUARTZ_TRIGGER_NAME);
+ }
+}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java
new file mode 100644
index 0000000..64bf0d1
--- /dev/null
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/worker/TCADMaaPSubscriberWorker.java
@@ -0,0 +1,124 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.worker;
+
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.metrics.Metrics;
+import co.cask.cdap.api.worker.WorkerContext;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.AppPreferencesToSubscriberConfigMapper;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.quartz.JobDataMap;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TCA DMaaP Subscriber will read messages and post them to cdap stream at regular intervals
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/14/2016.
+ */
+public class TCADMaaPSubscriberWorker extends BaseTCADMaaPMRWorker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPSubscriberWorker.class);
+
+ private DMaaPMRSubscriber subscriber;
+ private Metrics metrics;
+ @Property
+ private final String tcaSubscriberOutputStreamName;
+
+ public TCADMaaPSubscriberWorker(final String tcaSubscriberOutputStreamName) {
+ this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
+ }
+
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_DESCRIPTION_WORKER);
+ LOG.debug("Configuring TCA MR DMaaP Subscriber worker with name: {}",
+ CDAPComponentsConstants.TCA_FIXED_DMAAP_SUBSCRIBER_WORKER);
+ }
+
+ @Override
+ public void initialize(WorkerContext context) throws Exception {
+ super.initialize(context);
+
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
+
+ LOG.info("Initializing TCA MR DMaaP Subscriber worker with preferences: {}", tcaAppPreferences);
+
+ // Map TCA App Preferences to DMaaP MR Subscriber Config
+ final DMaaPMRSubscriberConfig subscriberConfig = AppPreferencesToSubscriberConfigMapper.map(tcaAppPreferences);
+
+ LOG.info("TCA DMaaP MR Subscriber worker will be writing to CDAP Stream: {}", tcaSubscriberOutputStreamName);
+
+ // Create an instance of DMaaP MR Subscriber
+ LOG.debug("Creating an instance of DMaaP Subscriber");
+ subscriber = DMaaPMRFactory.create().createSubscriber(subscriberConfig);
+
+ // initialize a new Quartz scheduler
+ initializeScheduler(tcaAppPreferences, new StdSchedulerFactory());
+
+ // initialize scheduler state
+ isSchedulerShutdown = new AtomicBoolean(true);
+ }
+
+ /**
+ * Initializes a scheduler instance for DMaaP MR Subscriber Job
+ *
+ * @throws SchedulerException SchedulerException
+ */
+ private void initializeScheduler(final TCAAppPreferences tcaAppPreferences,
+ final StdSchedulerFactory stdSchedulerFactory) throws SchedulerException {
+
+ // Get Subscriber polling interval
+ final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
+
+ // Subscriber Quartz Properties file
+ final String quartzSubscriberPropertiesFileName = AnalyticsConstants.TCA_QUARTZ_SUBSCRIBER_PROPERTIES_FILE_NAME;
+
+ // Create a new JobDataMap containing information required by TCA DMaaP Subscriber Job
+ final JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME, tcaSubscriberOutputStreamName);
+ jobDataMap.put(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME, getContext());
+ jobDataMap.put(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME, subscriber);
+ jobDataMap.put(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME, metrics);
+
+ // Create new publisher scheduler
+ scheduler = TCAUtils.createQuartzScheduler(subscriberPollingInterval, stdSchedulerFactory,
+ quartzSubscriberPropertiesFileName, jobDataMap, TCADMaaPMRSubscriberJob.class,
+ AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_JOB_NAME,
+ AnalyticsConstants.TCA_DMAAP_SUBSCRIBER_QUARTZ_TRIGGER_NAME);
+ }
+
+
+}