summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java73
1 files changed, 42 insertions, 31 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index d61e15016a..00d3fedfc8 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,6 +20,7 @@
package org.openecomp.sdc.be.components.distribution.engine;
import fj.data.Either;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
@@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable {
private AtomicBoolean status = null;
private OperationalEnvironmentEntry environmentEntry;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private KafkaHandler kafkaHandler = new KafkaHandler();
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
@@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable {
@Override
public void run() {
- boolean result = false;
- result = initFlow();
- if (result) {
+ if (initFlow()) {
this.stopTask();
this.status.set(true);
if (this.distributionEnginePollingTask != null) {
@@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable {
* @return
*/
public boolean initFlow() {
- logger.trace("Start init flow for environment {}", this.envName);
- Set<String> topicsList = null;
- Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
- getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
- if (getTopicsRes.isRight()) {
- CambriaErrorResponse status = getTopicsRes.right().value();
- if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
- topicsList = new HashSet<>();
+ logger.info("Start init flow for environment {}", this.envName);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ Set<String> topicsList;
+ Either<Set<String>, CambriaErrorResponse> getTopicsRes;
+ getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress()));
+ if (getTopicsRes.isRight()) {
+ CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value();
+ if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+ topicsList = new HashSet<>();
+ } else {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW,
+ "try retrieve list of topics from U-EB server");
+ return false;
+ }
} else {
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+ topicsList = getTopicsRes.left().value();
+ }
+ String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", notificationTopic);
+ if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
+ return false;
+ }
+ CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic,
+ SubscriberTypeEnum.PRODUCER);
+ CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+ if (createStatus != CambriaOperationStatus.OK) {
+ return false;
+ }
+ String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", statusTopic);
+ if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
return false;
}
+ CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+ return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
} else {
- topicsList = getTopicsRes.left().value();
+ logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging");
+ return true;
}
- String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
- logger.debug("Going to handle topic {}", notificationTopic);
- if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
- return false;
- }
- CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
- CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
- if (createStatus != CambriaOperationStatus.OK) {
- return false;
- }
- String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
- logger.debug("Going to handle topic {}", statusTopic);
- if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
- return false;
- }
- CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
- return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
}
private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
@@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable {
protected void setCambriaHandler(CambriaHandler cambriaHandler) {
this.cambriaHandler = cambriaHandler;
}
+
+ protected void setKafkaHandler(KafkaHandler kafkaHandler) {
+ this.kafkaHandler = kafkaHandler;
+ }
}