summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java77
1 files changed, 34 insertions, 43 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index 933d3ef4a1..124671086f 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.openecomp.sdc.be.components.distribution.engine;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import fj.data.Either;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
@@ -34,41 +37,32 @@ import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.common.log.wrappers.Logger;
import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
public class DistributionEnginePollingTask implements Runnable {
public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
private static final String PARTNER_NAME = "UNKNOWN";
-
+ private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
+ private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
+ ScheduledFuture<?> scheduledFuture = null;
private String topicName;
private ComponentsUtils componentUtils;
private int fetchTimeoutInSec = 15;
private int pollingIntervalInSec;
private String consumerId;
private String consumerGroup;
-
private CambriaHandler cambriaHandler = new CambriaHandler();
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
private DistributionCompleteReporter distributionCompleteReporter;
-
- private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
-
- private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
- private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
-
- ScheduledFuture<?> scheduledFuture = null;
+ private ScheduledExecutorService scheduledPollingService = Executors
+ .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
private CambriaConsumer cambriaConsumer = null;
-
private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
-
private OperationalEnvironmentEntry environmentEntry;
- public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
-
+ public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration,
+ DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils,
+ DistributionEngineClusterHealth distributionEngineClusterHealth,
+ OperationalEnvironmentEntry environmentEntry) {
this.componentUtils = componentUtils;
DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
@@ -81,7 +75,6 @@ public class DistributionEnginePollingTask implements Runnable {
}
public void startTask(String topicName) {
-
this.topicName = topicName;
logger.debug("start task for polling topic {}", topicName);
if (fetchTimeoutInSec < 15) {
@@ -89,13 +82,12 @@ public class DistributionEnginePollingTask implements Runnable {
fetchTimeoutInSec = 15;
}
try {
- cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
- fetchTimeoutInSec * 1000);
-
+ cambriaConsumer = cambriaHandler
+ .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
+ consumerId, consumerGroup, fetchTimeoutInSec * 1000);
if (scheduledPollingService != null) {
logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
-
}
} catch (Exception e) {
logger.debug("unexpected error occured", e);
@@ -117,7 +109,6 @@ public class DistributionEnginePollingTask implements Runnable {
logger.debug("close consumer");
cambriaHandler.closeConsumer(cambriaConsumer);
}
-
}
public void destroy() {
@@ -128,7 +119,6 @@ public class DistributionEnginePollingTask implements Runnable {
@Override
public void run() {
logger.trace("run() method. polling queue {}", topicName);
-
try {
// init error
if (cambriaConsumer == null) {
@@ -136,19 +126,19 @@ public class DistributionEnginePollingTask implements Runnable {
stopTask();
return;
}
-
Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
// fetch error
if (fetchResult.isRight()) {
CambriaErrorResponse errorResponse = fetchResult.right().value();
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
-
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
+ "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
// TODO: if status== internal error (connection problem) change
+
// state to inactive
+
// in next try, if succeed - change to active
return;
}
-
// success
Iterable<String> messages = fetchResult.left().value();
for (String message : messages) {
@@ -160,42 +150,44 @@ public class DistributionEnginePollingTask implements Runnable {
distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
} catch (Exception e) {
logger.debug("failed to convert message to object", e);
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
+ "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
}
-
}
} catch (Exception e) {
logger.debug("unexpected error occurred", e);
String methodName = Object.class.getEnclosingMethod().getName();
BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
}
-
}
private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) {
- componentUtils.auditDistributionStatusNotification(notification.getDistributionID(),
- notification.getConsumerID(), topicName, notification.getArtifactURL(),
- String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
- notification.getErrorReason(), audit);
+ componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName,
+ notification.getArtifactURL(), String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
+ notification.getErrorReason(), audit);
if (notification.isDistributionCompleteNotification()) {
distributionCompleteReporter.reportDistributionComplete(notification);
}
}
private void shutdownExecutor() {
- if (scheduledPollingService == null)
+ if (scheduledPollingService == null) {
return;
-
+ }
scheduledPollingService.shutdown(); // Disable new tasks from being
+
// submitted
try {
// Wait a while for existing tasks to terminate
if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
scheduledPollingService.shutdownNow(); // Cancel currently
+
// executing tasks
+
// Wait a while for tasks to respond to being cancelled
- if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
+ if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
logger.debug("Pool did not terminate");
+ }
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
@@ -204,5 +196,4 @@ public class DistributionEnginePollingTask implements Runnable {
Thread.currentThread().interrupt();
}
}
-
}