summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java109
1 files changed, 42 insertions, 67 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index 52b1967397..6207cd2a5a 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.openecomp.sdc.be.components.distribution.engine;
import fj.data.Either;
-import org.openecomp.sdc.be.config.BeEcompErrorManager;
-import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
-import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
-import org.openecomp.sdc.be.impl.ComponentsUtils;
-import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
-import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
-import org.openecomp.sdc.common.log.wrappers.Logger;
-
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -38,6 +28,14 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
+import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
+import org.openecomp.sdc.common.log.wrappers.Logger;
public class DistributionEngineInitTask implements Runnable {
@@ -48,26 +46,25 @@ public class DistributionEngineInitTask implements Runnable {
public static final String CREATED = "CREATED";
public static final String FAILED = "FAILED";
public static final Integer HTTP_OK = 200;
-
+ private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
+ boolean maximumRetryInterval = false;
+ ComponentsUtils componentsUtils = null;
+ DistributionEnginePollingTask distributionEnginePollingTask = null;
+ ScheduledFuture<?> scheduledFuture = null;
private Long delayBeforeStartFlow = 0l;
private DistributionEngineConfiguration deConfiguration;
private String envName;
private long retryInterval;
private long currentRetryInterval;
private long maxInterval;
- boolean maximumRetryInterval = false;
private AtomicBoolean status = null;
- ComponentsUtils componentsUtils = null;
- DistributionEnginePollingTask distributionEnginePollingTask = null;
private OperationalEnvironmentEntry environmentEntry;
-
private CambriaHandler cambriaHandler = new CambriaHandler();
-
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
- private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
- ScheduledFuture<?> scheduledFuture = null;
-
- public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
+
+ public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
+ AtomicBoolean status, ComponentsUtils componentsUtils,
+ DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
super();
this.delayBeforeStartFlow = delayBeforeStartFlow;
this.deConfiguration = deConfiguration;
@@ -81,26 +78,25 @@ public class DistributionEngineInitTask implements Runnable {
this.environmentEntry = environmentEntry;
}
+ public static String buildTopicName(String topicName, String environment) {
+ return topicName + "-" + environment.toUpperCase();
+ }
+
public void startTask() {
if (scheduledExecutorService != null) {
Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
- logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
+ logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval,
+ delayBeforeStartFlow);
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
-
}
}
public void restartTask() {
-
this.stopTask();
-
logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
-
long lastCurrentInterval = currentRetryInterval;
incrementRetryInterval();
-
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
-
}
protected void incrementRetryInterval() {
@@ -140,10 +136,8 @@ public class DistributionEngineInitTask implements Runnable {
@Override
public void run() {
-
boolean result = false;
result = initFlow();
-
if (result) {
this.stopTask();
this.status.set(true);
@@ -165,12 +159,9 @@ public class DistributionEngineInitTask implements Runnable {
* @return
*/
public boolean initFlow() {
-
logger.trace("Start init flow for environment {}", this.envName);
-
Set<String> topicsList = null;
Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-
getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
if (getTopicsRes.isRight()) {
CambriaErrorResponse status = getTopicsRes.right().value();
@@ -183,35 +174,29 @@ public class DistributionEngineInitTask implements Runnable {
} else {
topicsList = getTopicsRes.left().value();
}
-
String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
logger.debug("Going to handle topic {}", notificationTopic);
if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
return false;
}
-
CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-
CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-
if (createStatus != CambriaOperationStatus.OK) {
return false;
}
-
String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
logger.debug("Going to handle topic {}", statusTopic);
if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
return false;
}
-
CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-
return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
}
private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
- CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
-
+ CambriaErrorResponse registerStatus = cambriaHandler
+ .registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
+ environmentEntry.getUebApikey(), subscriberType, topicName);
String role = CONSUMER;
if (subscriberType == SubscriberTypeEnum.PRODUCER) {
role = PRODUCER;
@@ -225,56 +210,50 @@ public class DistributionEngineInitTask implements Runnable {
Integer httpCode = registerProducerStatus.getHttpCode();
String httpCodeStr = String.valueOf(httpCode);
this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
- DistributionTopicData.newBuilder()
- .notificationTopic(notificationTopic)
- .build(),
- role, environmentEntry.getUebApikey(), httpCodeStr);
+ DistributionTopicData.newBuilder().notificationTopic(notificationTopic).build(), role, environmentEntry.getUebApikey(), httpCodeStr);
}
}
private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
- DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
- .statusTopic(topicName)
- .build();
+ DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().statusTopic(topicName).build();
return createDistributionTopic(topicsList, topicName, distributionTopicData);
}
private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
-
- DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
- .notificationTopic(topicName)
- .build();
+ DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().notificationTopic(topicName).build();
return createDistributionTopic(topicsList, topicName, distributionTopicData);
}
private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
-
boolean isSucceeded = true;
-
if (topicsList.contains(topicName)) {
if (componentsUtils != null) {
- componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
+ componentsUtils
+ .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
}
return isSucceeded;
}
- CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
- deConfiguration.getCreateTopic().getReplicationCount());
-
+ CambriaErrorResponse createDistribTopicStatus = cambriaHandler
+ .createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName,
+ deConfiguration.getCreateTopic().getPartitionCount(), deConfiguration.getCreateTopic().getReplicationCount());
CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
switch (status) {
case OK:
if (componentsUtils != null) {
- componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
+ componentsUtils
+ .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
}
break;
case TOPIC_ALREADY_EXIST:
if (componentsUtils != null) {
- componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
+ componentsUtils
+ .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
}
break;
default:
if (componentsUtils != null) {
- componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
+ componentsUtils
+ .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
}
BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
isSucceeded = false;
@@ -283,10 +262,6 @@ public class DistributionEngineInitTask implements Runnable {
return isSucceeded;
}
- public static String buildTopicName(String topicName, String environment) {
- return topicName + "-" + environment.toUpperCase();
- }
-
public boolean isActive() {
return this.status.get();
}