diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java | 101 |
1 files changed, 60 insertions, 41 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index 043efdf81d..2b1e716fa0 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -20,6 +20,16 @@ package org.openecomp.sdc.be.components.distribution.engine; +import fj.data.Either; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; +import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; +import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData; +import org.openecomp.sdc.common.log.wrappers.Logger; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; @@ -29,17 +39,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.openecomp.sdc.be.config.BeEcompErrorManager; -import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; -import org.openecomp.sdc.be.impl.ComponentsUtils; -import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; -import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import fj.data.Either; - public class DistributionEngineInitTask implements Runnable { public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine"; @@ -80,7 +79,7 @@ public class DistributionEngineInitTask implements Runnable { private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class); + private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName()); ScheduledFuture<?> scheduledFuture = null; @@ -127,7 +126,7 @@ public class DistributionEngineInitTask implements Runnable { if (scheduledFuture != null) { boolean result = scheduledFuture.cancel(true); logger.debug("Stop reinit task. result = {}", result); - if (false == result) { + if (!result) { BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task"); } scheduledFuture = null; @@ -147,7 +146,7 @@ public class DistributionEngineInitTask implements Runnable { boolean result = false; result = initFlow(); - if (true == result) { + if (result) { this.stopTask(); this.status.set(true); if (this.distributionEnginePollingTask != null) { @@ -156,7 +155,7 @@ public class DistributionEngineInitTask implements Runnable { this.distributionEnginePollingTask.startTask(topicName); } } else { - if (false == maximumRetryInterval) { + if (!maximumRetryInterval) { this.restartTask(); } } @@ -189,9 +188,7 @@ public class DistributionEngineInitTask implements Runnable { String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); logger.debug("Going to handle topic {}", notificationTopic); - - boolean status = createTopicIfNotExists(topicsList, notificationTopic); - if (false == status) { + if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { return false; } @@ -205,8 +202,7 @@ public class DistributionEngineInitTask implements Runnable { String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); logger.debug("Going to handle topic {}", statusTopic); - status = createTopicIfNotExists(topicsList, statusTopic); - if (false == status) { + if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { return false; } @@ -234,40 +230,63 @@ public class DistributionEngineInitTask implements Runnable { if (componentsUtils != null) { Integer httpCode = registerProducerStatus.getHttpCode(); String httpCodeStr = String.valueOf(httpCode); - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr); + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, + DistributionTopicData.newBuilder() + .notificationTopic(notificationTopic) + .build(), + role, environmentEntry.getUebApikey(), httpCodeStr); } } - private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) { + private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) { + DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder() + .statusTopic(topicName) + .build(); + return createDistributionTopic(topicsList, topicName, distributionTopicData); + } + + private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) { + + DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder() + .notificationTopic(topicName) + .build(); + return createDistributionTopic(topicsList, topicName, distributionTopicData); + } + + private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) { + + boolean isSucceeded = true; if (topicsList.contains(topicName)) { if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); + componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); } - return true; + return isSucceeded; } - CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(), deConfiguration.getCreateTopic().getReplicationCount()); CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus(); - if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); - } - } else if (status == CambriaOperationStatus.OK) { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED); - } - } else { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED); - } - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); - return false; + switch (status) { + case OK: + if (componentsUtils != null) { + componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED); + } + break; + case TOPIC_ALREADY_EXIST: + if (componentsUtils != null) { + componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); + } + break; + default: + if (componentsUtils != null) { + componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED); + } + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); + isSucceeded = false; + break; } - - return true; + return isSucceeded; } public static String buildTopicName(String topicName, String environment) { |