aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java512
1 files changed, 253 insertions, 259 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index 1eeaa1229e..1759f69b3e 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,274 +20,268 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import fj.data.Either;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.config.EcompErrorName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import fj.data.Either;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
public class DistributionEngineInitTask implements Runnable {
- public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
- public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
- public static final String CONSUMER = "CONSUMER";
- public static final String PRODUCER = "PRODUCER";
- public static final String CREATED = "CREATED";
- public static final String FAILED = "FAILED";
- public static final Integer HTTP_OK = 200;
-
- private Long delayBeforeStartFlow = 0l;
- private DistributionEngineConfiguration deConfiguration;
- private String envName;
- private long retryInterval;
- private long currentRetryInterval;
- private long maxInterval;
- // private boolean active = false;
- boolean maximumRetryInterval = false;
- private AtomicBoolean status = null;
- ComponentsUtils componentsUtils = null;
- DistributionEnginePollingTask distributionEnginePollingTask = null;
-
- private CambriaHandler cambriaHandler = new CambriaHandler();
-
- public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) {
- super();
- this.delayBeforeStartFlow = delayBeforeStartFlow;
- this.deConfiguration = deConfiguration;
- this.envName = envName;
- this.retryInterval = deConfiguration.getInitRetryIntervalSec();
- this.currentRetryInterval = retryInterval;
- this.maxInterval = deConfiguration.getInitMaxIntervalSec();
- this.status = status;
- this.componentsUtils = componentsUtils;
- this.distributionEnginePollingTask = distributionEnginePollingTask;
- }
-
- private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-
- private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName());
-
- ScheduledFuture<?> scheduledFuture = null;
-
- public void startTask() {
- if (scheduledExecutorService != null) {
- Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
- logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
- this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
-
- }
- }
-
- public void restartTask() {
-
- this.stopTask();
-
- logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
-
- long lastCurrentInterval = currentRetryInterval;
- incrementRetryInterval();
-
- this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
-
- }
-
- protected void incrementRetryInterval() {
- if (currentRetryInterval < maxInterval) {
- currentRetryInterval *= 2;
- if (currentRetryInterval > maxInterval) {
- setMaxRetryInterval();
- }
- } else {
- setMaxRetryInterval();
- }
- }
-
- private void setMaxRetryInterval() {
- currentRetryInterval = maxInterval;
- maximumRetryInterval = true;
- logger.debug("Set next retry init interval to {}", maxInterval);
- }
-
- public void stopTask() {
- if (scheduledFuture != null) {
- boolean result = scheduledFuture.cancel(true);
- logger.debug("Stop reinit task. result = {}", result);
- if (false == result) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
- }
- scheduledFuture = null;
- }
- }
-
- public void destroy() {
- this.stopTask();
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdown();
- }
- }
-
- @Override
- public void run() {
-
- boolean result = false;
- result = initFlow();
-
- if (true == result) {
- this.stopTask();
- this.status.set(true);
- if (this.distributionEnginePollingTask != null) {
- String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
- logger.debug("start polling distribution status topic {}", topicName);
- this.distributionEnginePollingTask.startTask(topicName);
- }
- } else {
- if (false == maximumRetryInterval) {
- this.restartTask();
- }
- }
- }
-
- /**
- * run initialization flow
- *
- * @return
- */
- public boolean initFlow() {
-
- logger.trace("Start init flow for environment {}", this.envName);
-
- Set<String> topicsList = null;
- Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-
- getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers());
- if (getTopicsRes.isRight()) {
- CambriaErrorResponse status = getTopicsRes.right().value();
- if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
- topicsList = new HashSet<>();
- } else {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
- return false;
- }
- } else {
- topicsList = getTopicsRes.left().value();
- }
-
- String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
- logger.debug("Going to handle topic {}", notificationTopic);
-
- boolean status = createTopicIfNotExists(topicsList, notificationTopic);
- if (false == status) {
- return false;
- }
-
- CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-
- CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-
- if (createStatus != CambriaOperationStatus.OK) {
- return false;
- }
-
- String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
- logger.debug("Going to handle topic {}", statusTopic);
- status = createTopicIfNotExists(topicsList, statusTopic);
- if (false == status) {
- return false;
- }
-
- CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-
- if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
- return false;
- }
-
- return true;
- }
-
- private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
- CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType);
-
- String role = CONSUMER;
- if (subscriberType == SubscriberTypeEnum.PRODUCER) {
- role = PRODUCER;
- }
- auditRegistration(topicName, registerStatus, role);
- return registerStatus;
- }
-
- private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
- if (componentsUtils != null) {
- Integer httpCode = registerProducerStatus.getHttpCode();
- String httpCodeStr = String.valueOf(httpCode);
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr);
- }
- }
-
- private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
-
- if (topicsList.contains(topicName)) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
- }
- return true;
- }
-
- CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
- deConfiguration.getCreateTopic().getReplicationCount());
-
- CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
- if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
- }
- } else if (status == CambriaOperationStatus.OK) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
- }
- } else {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
- }
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
- return false;
- }
-
- return true;
- }
-
- public static String buildTopicName(String topicName, String environment) {
- return topicName + "-" + environment.toUpperCase();
- }
-
- public boolean isActive() {
- return this.status.get();
- }
-
- public long getCurrentRetryInterval() {
- return currentRetryInterval;
- }
-
- protected void setCambriaHandler(CambriaHandler cambriaHandler) {
- this.cambriaHandler = cambriaHandler;
- }
+ public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
+ public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
+ public static final String CONSUMER = "CONSUMER";
+ public static final String PRODUCER = "PRODUCER";
+ public static final String CREATED = "CREATED";
+ public static final String FAILED = "FAILED";
+ public static final Integer HTTP_OK = 200;
+
+ private Long delayBeforeStartFlow = 0l;
+ private DistributionEngineConfiguration deConfiguration;
+ private String envName;
+ private long retryInterval;
+ private long currentRetryInterval;
+ private long maxInterval;
+ boolean maximumRetryInterval = false;
+ private AtomicBoolean status = null;
+ ComponentsUtils componentsUtils = null;
+ DistributionEnginePollingTask distributionEnginePollingTask = null;
+ private OperationalEnvironmentEntry environmentEntry;
+
+ private CambriaHandler cambriaHandler = new CambriaHandler();
+
+ public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
+ super();
+ this.delayBeforeStartFlow = delayBeforeStartFlow;
+ this.deConfiguration = deConfiguration;
+ this.envName = envName;
+ this.retryInterval = deConfiguration.getInitRetryIntervalSec();
+ this.currentRetryInterval = retryInterval;
+ this.maxInterval = deConfiguration.getInitMaxIntervalSec();
+ this.status = status;
+ this.componentsUtils = componentsUtils;
+ this.distributionEnginePollingTask = distributionEnginePollingTask;
+ this.environmentEntry = environmentEntry;
+ }
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class);
+
+ ScheduledFuture<?> scheduledFuture = null;
+
+ public void startTask() {
+ if (scheduledExecutorService != null) {
+ Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
+ logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
+ this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
+
+ }
+ }
+
+ public void restartTask() {
+
+ this.stopTask();
+
+ logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
+
+ long lastCurrentInterval = currentRetryInterval;
+ incrementRetryInterval();
+
+ this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
+
+ }
+
+ protected void incrementRetryInterval() {
+ if (currentRetryInterval < maxInterval) {
+ currentRetryInterval *= 2;
+ if (currentRetryInterval > maxInterval) {
+ setMaxRetryInterval();
+ }
+ } else {
+ setMaxRetryInterval();
+ }
+ }
+
+ private void setMaxRetryInterval() {
+ currentRetryInterval = maxInterval;
+ maximumRetryInterval = true;
+ logger.debug("Set next retry init interval to {}", maxInterval);
+ }
+
+ public void stopTask() {
+ if (scheduledFuture != null) {
+ boolean result = scheduledFuture.cancel(true);
+ logger.debug("Stop reinit task. result = {}", result);
+ if (false == result) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
+ }
+ scheduledFuture = null;
+ }
+ }
+
+ public void destroy() {
+ this.stopTask();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+ }
+
+ @Override
+ public void run() {
+
+ boolean result = false;
+ result = initFlow();
+
+ if (true == result) {
+ this.stopTask();
+ this.status.set(true);
+ if (this.distributionEnginePollingTask != null) {
+ String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
+ logger.debug("start polling distribution status topic {}", topicName);
+ this.distributionEnginePollingTask.startTask(topicName);
+ }
+ } else {
+ if (false == maximumRetryInterval) {
+ this.restartTask();
+ }
+ }
+ }
+
+ /**
+ * run initialization flow
+ *
+ * @return
+ */
+ public boolean initFlow() {
+
+ logger.trace("Start init flow for environment {}", this.envName);
+
+ Set<String> topicsList = null;
+ Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
+
+ getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
+ if (getTopicsRes.isRight()) {
+ CambriaErrorResponse status = getTopicsRes.right().value();
+ if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+ topicsList = new HashSet<>();
+ } else {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+ return false;
+ }
+ } else {
+ topicsList = getTopicsRes.left().value();
+ }
+
+ String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", notificationTopic);
+
+ boolean status = createTopicIfNotExists(topicsList, notificationTopic);
+ if (false == status) {
+ return false;
+ }
+
+ CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
+
+ CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+
+ if (createStatus != CambriaOperationStatus.OK) {
+ return false;
+ }
+
+ String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", statusTopic);
+ status = createTopicIfNotExists(topicsList, statusTopic);
+ if (false == status) {
+ return false;
+ }
+
+ CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+
+ if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
+ CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
+
+ String role = CONSUMER;
+ if (subscriberType == SubscriberTypeEnum.PRODUCER) {
+ role = PRODUCER;
+ }
+ auditRegistration(topicName, registerStatus, role);
+ return registerStatus;
+ }
+
+ private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
+ if (componentsUtils != null) {
+ Integer httpCode = registerProducerStatus.getHttpCode();
+ String httpCodeStr = String.valueOf(httpCode);
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr);
+ }
+ }
+
+ private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
+
+ if (topicsList.contains(topicName)) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+ }
+ return true;
+ }
+
+ CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
+ deConfiguration.getCreateTopic().getReplicationCount());
+
+ CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
+ if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+ }
+ } else if (status == CambriaOperationStatus.OK) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
+ }
+ } else {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
+ }
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
+ return false;
+ }
+
+ return true;
+ }
+
+ public static String buildTopicName(String topicName, String environment) {
+ return topicName + "-" + environment.toUpperCase();
+ }
+
+ public boolean isActive() {
+ return this.status.get();
+ }
+
+ public long getCurrentRetryInterval() {
+ return currentRetryInterval;
+ }
+
+ protected void setCambriaHandler(CambriaHandler cambriaHandler) {
+ this.cambriaHandler = cambriaHandler;
+ }
}