aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java343
1 files changed, 172 insertions, 171 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index fc7c473d6b..b4f4863284 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -20,188 +20,189 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import fj.data.Either;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.config.EcompErrorName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import fj.data.Either;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class DistributionEnginePollingTask implements Runnable {
- public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
-
- private String topicName;
- private ComponentsUtils componentUtils;
- private int fetchTimeoutInSec = 15;
- private int pollingIntervalInSec;
- private String consumerId;
- private String consumerGroup;
- private DistributionEngineConfiguration distributionEngineConfiguration;
-
- private CambriaHandler cambriaHandler = new CambriaHandler();
- private Gson gson = new GsonBuilder().setPrettyPrinting().create();
-
- private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
-
- private static Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class.getName());
-
- ScheduledFuture<?> scheduledFuture = null;
- private CambriaConsumer cambriaConsumer = null;
-
- private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
-
- public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, String envName, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth) {
-
- this.componentUtils = componentUtils;
- this.distributionEngineConfiguration = distributionEngineConfiguration;
- DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
- this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
- this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
- this.consumerGroup = statusConfig.getConsumerGroup();
- this.consumerId = statusConfig.getConsumerId();
- this.distributionEngineClusterHealth = distributionEngineClusterHealth;
- }
-
- public void startTask(String topicName) {
-
- this.topicName = topicName;
- logger.debug("start task for polling topic {}", topicName);
- if (fetchTimeoutInSec < 15) {
- logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
- fetchTimeoutInSec = 15;
- }
- try {
- cambriaConsumer = cambriaHandler.createConsumer(distributionEngineConfiguration.getUebServers(), topicName, distributionEngineConfiguration.getUebPublicKey(), distributionEngineConfiguration.getUebSecretKey(), consumerId, consumerGroup,
- fetchTimeoutInSec * 1000);
-
- if (scheduledPollingService != null) {
- logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
- scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
-
- }
- } catch (Exception e) {
- logger.debug("unexpected error occured", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
- }
- }
-
- public void stopTask() {
- if (scheduledFuture != null) {
- boolean result = scheduledFuture.cancel(true);
- logger.debug("Stop polling task. result = {}", result);
- if (false == result) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
- }
- scheduledFuture = null;
- }
- if (cambriaConsumer != null) {
- logger.debug("close consumer");
- cambriaHandler.closeConsumer(cambriaConsumer);
- }
-
- }
-
- public void destroy() {
- this.stopTask();
- shutdownExecutor();
- }
-
- @Override
- public void run() {
- logger.trace("run() method. polling queue {}", topicName);
-
- try {
- // init error
- if (cambriaConsumer == null) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- stopTask();
- return;
- }
-
- Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
- // fetch error
- if (fetchResult.isRight()) {
- CambriaErrorResponse errorResponse = fetchResult.right().value();
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
-
- // TODO: if status== internal error (connection problem) change
- // state to inactive
- // in next try, if succeed - change to active
- return;
- }
-
- // success
- Iterable<String> messages = fetchResult.left().value();
- for (String message : messages) {
- logger.trace("received message {}", message);
- try {
- DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
- componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
- String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
-
- distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
-
- } catch (Exception e) {
- logger.debug("failed to convert message to object", e);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
- }
-
- }
- } catch (Exception e) {
- logger.debug("unexpected error occured", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
- }
-
- }
-
- private void shutdownExecutor() {
- if (scheduledPollingService == null)
- return;
-
- scheduledPollingService.shutdown(); // Disable new tasks from being
- // submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
- scheduledPollingService.shutdownNow(); // Cancel currently
- // executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
- logger.debug("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- scheduledPollingService.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
+ public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
+
+ private String topicName;
+ private ComponentsUtils componentUtils;
+ private int fetchTimeoutInSec = 15;
+ private int pollingIntervalInSec;
+ private String consumerId;
+ private String consumerGroup;
+
+ private CambriaHandler cambriaHandler = new CambriaHandler();
+ private Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ private DistributionCompleteReporter distributionCompleteReporter;
+
+ private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class);
+
+ ScheduledFuture<?> scheduledFuture = null;
+ private CambriaConsumer cambriaConsumer = null;
+
+ private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
+
+ private OperationalEnvironmentEntry environmentEntry;
+
+ public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
+
+ this.componentUtils = componentUtils;
+ DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
+ this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
+ this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
+ this.consumerGroup = statusConfig.getConsumerGroup();
+ this.consumerId = statusConfig.getConsumerId();
+ this.distributionEngineClusterHealth = distributionEngineClusterHealth;
+ this.environmentEntry = environmentEntry;
+ this.distributionCompleteReporter = distributionCompleteReporter;
+ }
+
+ public void startTask(String topicName) {
+
+ this.topicName = topicName;
+ logger.debug("start task for polling topic {}", topicName);
+ if (fetchTimeoutInSec < 15) {
+ logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
+ fetchTimeoutInSec = 15;
+ }
+ try {
+ cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
+ fetchTimeoutInSec * 1000);
+
+ if (scheduledPollingService != null) {
+ logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
+ scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
+
+ }
+ } catch (Exception e) {
+ logger.debug("unexpected error occured", e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ }
+ }
+
+ public void stopTask() {
+ if (scheduledFuture != null) {
+ boolean result = scheduledFuture.cancel(true);
+ logger.debug("Stop polling task. result = {}", result);
+ if (false == result) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
+ }
+ scheduledFuture = null;
+ }
+ if (cambriaConsumer != null) {
+ logger.debug("close consumer");
+ cambriaHandler.closeConsumer(cambriaConsumer);
+ }
+
+ }
+
+ public void destroy() {
+ this.stopTask();
+ shutdownExecutor();
+ }
+
+ @Override
+ public void run() {
+ logger.trace("run() method. polling queue {}", topicName);
+
+ try {
+ // init error
+ if (cambriaConsumer == null) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+ stopTask();
+ return;
+ }
+
+ Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+ // fetch error
+ if (fetchResult.isRight()) {
+ CambriaErrorResponse errorResponse = fetchResult.right().value();
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
+
+ // TODO: if status== internal error (connection problem) change
+ // state to inactive
+ // in next try, if succeed - change to active
+ return;
+ }
+
+ // success
+ Iterable<String> messages = fetchResult.left().value();
+ for (String message : messages) {
+ logger.trace("received message {}", message);
+ try {
+ DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
+ handleDistributionNotificationMsg(notification);
+ distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
+ } catch (Exception e) {
+ logger.debug("failed to convert message to object", e);
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
+ }
+
+ }
+ } catch (Exception e) {
+ logger.debug("unexpected error occured", e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ }
+
+ }
+
+ private void handleDistributionNotificationMsg(DistributionStatusNotification notification) {
+ componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
+ String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
+ if (notification.isDistributionCompleteNotification()) {
+ distributionCompleteReporter.reportDistributionComplete(notification);
+ }
+ }
+
+ private void shutdownExecutor() {
+ if (scheduledPollingService == null)
+ return;
+
+ scheduledPollingService.shutdown(); // Disable new tasks from being
+ // submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduledPollingService.shutdownNow(); // Cancel currently
+ // executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
+ logger.debug("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ scheduledPollingService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}