diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java | 73 |
1 files changed, 42 insertions, 31 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index d61e15016a..00d3fedfc8 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -20,6 +20,7 @@ package org.openecomp.sdc.be.components.distribution.engine; import fj.data.Either; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; @@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; @@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable { private AtomicBoolean status = null; private OperationalEnvironmentEntry environmentEntry; private CambriaHandler cambriaHandler = new CambriaHandler(); + private KafkaHandler kafkaHandler = new KafkaHandler(); private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, @@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable { @Override public void run() { - boolean result = false; - result = initFlow(); - if (result) { + if (initFlow()) { this.stopTask(); this.status.set(true); if (this.distributionEnginePollingTask != null) { @@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable { * @return */ public boolean initFlow() { - logger.trace("Start init flow for environment {}", this.envName); - Set<String> topicsList = null; - Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; - getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList())); - if (getTopicsRes.isRight()) { - CambriaErrorResponse status = getTopicsRes.right().value(); - if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { - topicsList = new HashSet<>(); + logger.info("Start init flow for environment {}", this.envName); + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + Set<String> topicsList; + Either<Set<String>, CambriaErrorResponse> getTopicsRes; + getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress())); + if (getTopicsRes.isRight()) { + CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value(); + if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { + topicsList = new HashSet<>(); + } else { + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, + "try retrieve list of topics from U-EB server"); + return false; + } } else { - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); + topicsList = getTopicsRes.left().value(); + } + String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); + logger.debug("Going to handle topic {}", notificationTopic); + if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { + return false; + } + CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, + SubscriberTypeEnum.PRODUCER); + CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); + if (createStatus != CambriaOperationStatus.OK) { + return false; + } + String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); + logger.debug("Going to handle topic {}", statusTopic); + if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { return false; } + CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); + return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } else { - topicsList = getTopicsRes.left().value(); + logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging"); + return true; } - String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); - logger.debug("Going to handle topic {}", notificationTopic); - if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { - return false; - } - CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); - CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); - if (createStatus != CambriaOperationStatus.OK) { - return false; - } - String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); - logger.debug("Going to handle topic {}", statusTopic); - if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { - return false; - } - CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); - return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { @@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable { protected void setCambriaHandler(CambriaHandler cambriaHandler) { this.cambriaHandler = cambriaHandler; } + + protected void setKafkaHandler(KafkaHandler kafkaHandler) { + this.kafkaHandler = kafkaHandler; + } } |