diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java | 77 |
1 files changed, 34 insertions, 43 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java index 933d3ef4a1..124671086f 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.be.components.distribution.engine; import com.att.nsa.cambria.client.CambriaConsumer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import fj.data.Either; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter; import org.openecomp.sdc.be.config.BeEcompErrorManager; @@ -34,41 +37,32 @@ import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; import org.openecomp.sdc.common.log.wrappers.Logger; import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - public class DistributionEnginePollingTask implements Runnable { public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling"; private static final String PARTNER_NAME = "UNKNOWN"; - + private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName()); + private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class); + ScheduledFuture<?> scheduledFuture = null; private String topicName; private ComponentsUtils componentUtils; private int fetchTimeoutInSec = 15; private int pollingIntervalInSec; private String consumerId; private String consumerGroup; - private CambriaHandler cambriaHandler = new CambriaHandler(); private Gson gson = new GsonBuilder().setPrettyPrinting().create(); private DistributionCompleteReporter distributionCompleteReporter; - - private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build()); - - private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName()); - private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class); - - ScheduledFuture<?> scheduledFuture = null; + private ScheduledExecutorService scheduledPollingService = Executors + .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build()); private CambriaConsumer cambriaConsumer = null; - private DistributionEngineClusterHealth distributionEngineClusterHealth = null; - private OperationalEnvironmentEntry environmentEntry; - public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) { - + public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, + DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, + DistributionEngineClusterHealth distributionEngineClusterHealth, + OperationalEnvironmentEntry environmentEntry) { this.componentUtils = componentUtils; DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic(); this.pollingIntervalInSec = statusConfig.getPollingIntervalSec(); @@ -81,7 +75,6 @@ public class DistributionEnginePollingTask implements Runnable { } public void startTask(String topicName) { - this.topicName = topicName; logger.debug("start task for polling topic {}", topicName); if (fetchTimeoutInSec < 15) { @@ -89,13 +82,12 @@ public class DistributionEnginePollingTask implements Runnable { fetchTimeoutInSec = 15; } try { - cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup, - fetchTimeoutInSec * 1000); - + cambriaConsumer = cambriaHandler + .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), + consumerId, consumerGroup, fetchTimeoutInSec * 1000); if (scheduledPollingService != null) { logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec); scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS); - } } catch (Exception e) { logger.debug("unexpected error occured", e); @@ -117,7 +109,6 @@ public class DistributionEnginePollingTask implements Runnable { logger.debug("close consumer"); cambriaHandler.closeConsumer(cambriaConsumer); } - } public void destroy() { @@ -128,7 +119,6 @@ public class DistributionEnginePollingTask implements Runnable { @Override public void run() { logger.trace("run() method. polling queue {}", topicName); - try { // init error if (cambriaConsumer == null) { @@ -136,19 +126,19 @@ public class DistributionEnginePollingTask implements Runnable { stopTask(); return; } - Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); // fetch error if (fetchResult.isRight()) { CambriaErrorResponse errorResponse = fetchResult.right().value(); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse); - + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, + "failed to fetch messages from topic " + topicName + " error: " + errorResponse); // TODO: if status== internal error (connection problem) change + // state to inactive + // in next try, if succeed - change to active return; } - // success Iterable<String> messages = fetchResult.left().value(); for (String message : messages) { @@ -160,42 +150,44 @@ public class DistributionEnginePollingTask implements Runnable { distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown(); } catch (Exception e) { logger.debug("failed to convert message to object", e); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value()); + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, + "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value()); } - } } catch (Exception e) { logger.debug("unexpected error occurred", e); String methodName = Object.class.getEnclosingMethod().getName(); BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); } - } private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) { - componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), - notification.getConsumerID(), topicName, notification.getArtifactURL(), - String.valueOf(notification.getTimestamp()), notification.getStatus().name(), - notification.getErrorReason(), audit); + componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName, + notification.getArtifactURL(), String.valueOf(notification.getTimestamp()), notification.getStatus().name(), + notification.getErrorReason(), audit); if (notification.isDistributionCompleteNotification()) { distributionCompleteReporter.reportDistributionComplete(notification); } } private void shutdownExecutor() { - if (scheduledPollingService == null) + if (scheduledPollingService == null) { return; - + } scheduledPollingService.shutdown(); // Disable new tasks from being + // submitted try { // Wait a while for existing tasks to terminate if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) { scheduledPollingService.shutdownNow(); // Cancel currently + // executing tasks + // Wait a while for tasks to respond to being cancelled - if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) + if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) { logger.debug("Pool did not terminate"); + } } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted @@ -204,5 +196,4 @@ public class DistributionEnginePollingTask implements Runnable { Thread.currentThread().interrupt(); } } - } |