diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java | 109 |
1 files changed, 42 insertions, 67 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index 52b1967397..6207cd2a5a 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,19 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.be.components.distribution.engine; import fj.data.Either; -import org.openecomp.sdc.be.config.BeEcompErrorManager; -import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; -import org.openecomp.sdc.be.impl.ComponentsUtils; -import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; -import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData; -import org.openecomp.sdc.common.log.wrappers.Logger; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; @@ -38,6 +28,14 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; +import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; +import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData; +import org.openecomp.sdc.common.log.wrappers.Logger; public class DistributionEngineInitTask implements Runnable { @@ -48,26 +46,25 @@ public class DistributionEngineInitTask implements Runnable { public static final String CREATED = "CREATED"; public static final String FAILED = "FAILED"; public static final Integer HTTP_OK = 200; - + private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName()); + boolean maximumRetryInterval = false; + ComponentsUtils componentsUtils = null; + DistributionEnginePollingTask distributionEnginePollingTask = null; + ScheduledFuture<?> scheduledFuture = null; private Long delayBeforeStartFlow = 0l; private DistributionEngineConfiguration deConfiguration; private String envName; private long retryInterval; private long currentRetryInterval; private long maxInterval; - boolean maximumRetryInterval = false; private AtomicBoolean status = null; - ComponentsUtils componentsUtils = null; - DistributionEnginePollingTask distributionEnginePollingTask = null; private OperationalEnvironmentEntry environmentEntry; - private CambriaHandler cambriaHandler = new CambriaHandler(); - private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName()); - ScheduledFuture<?> scheduledFuture = null; - - public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) { + + public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, + AtomicBoolean status, ComponentsUtils componentsUtils, + DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) { super(); this.delayBeforeStartFlow = delayBeforeStartFlow; this.deConfiguration = deConfiguration; @@ -81,26 +78,25 @@ public class DistributionEngineInitTask implements Runnable { this.environmentEntry = environmentEntry; } + public static String buildTopicName(String topicName, String environment) { + return topicName + "-" + environment.toUpperCase(); + } + public void startTask() { if (scheduledExecutorService != null) { Integer retryInterval = deConfiguration.getInitRetryIntervalSec(); - logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow); + logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, + delayBeforeStartFlow); this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS); - } } public void restartTask() { - this.stopTask(); - logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval); - long lastCurrentInterval = currentRetryInterval; incrementRetryInterval(); - this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS); - } protected void incrementRetryInterval() { @@ -140,10 +136,8 @@ public class DistributionEngineInitTask implements Runnable { @Override public void run() { - boolean result = false; result = initFlow(); - if (result) { this.stopTask(); this.status.set(true); @@ -165,12 +159,9 @@ public class DistributionEngineInitTask implements Runnable { * @return */ public boolean initFlow() { - logger.trace("Start init flow for environment {}", this.envName); - Set<String> topicsList = null; Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; - getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList())); if (getTopicsRes.isRight()) { CambriaErrorResponse status = getTopicsRes.right().value(); @@ -183,35 +174,29 @@ public class DistributionEngineInitTask implements Runnable { } else { topicsList = getTopicsRes.left().value(); } - String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); logger.debug("Going to handle topic {}", notificationTopic); if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { return false; } - CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); - CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); - if (createStatus != CambriaOperationStatus.OK) { return false; } - String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); logger.debug("Going to handle topic {}", statusTopic); if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { return false; } - CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); - return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { - CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName); - + CambriaErrorResponse registerStatus = cambriaHandler + .registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), + environmentEntry.getUebApikey(), subscriberType, topicName); String role = CONSUMER; if (subscriberType == SubscriberTypeEnum.PRODUCER) { role = PRODUCER; @@ -225,56 +210,50 @@ public class DistributionEngineInitTask implements Runnable { Integer httpCode = registerProducerStatus.getHttpCode(); String httpCodeStr = String.valueOf(httpCode); this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, - DistributionTopicData.newBuilder() - .notificationTopic(notificationTopic) - .build(), - role, environmentEntry.getUebApikey(), httpCodeStr); + DistributionTopicData.newBuilder().notificationTopic(notificationTopic).build(), role, environmentEntry.getUebApikey(), httpCodeStr); } } private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) { - DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder() - .statusTopic(topicName) - .build(); + DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().statusTopic(topicName).build(); return createDistributionTopic(topicsList, topicName, distributionTopicData); } private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) { - - DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder() - .notificationTopic(topicName) - .build(); + DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().notificationTopic(topicName).build(); return createDistributionTopic(topicsList, topicName, distributionTopicData); } private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) { - boolean isSucceeded = true; - if (topicsList.contains(topicName)) { if (componentsUtils != null) { - componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); + componentsUtils + .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); } return isSucceeded; } - CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(), - deConfiguration.getCreateTopic().getReplicationCount()); - + CambriaErrorResponse createDistribTopicStatus = cambriaHandler + .createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, + deConfiguration.getCreateTopic().getPartitionCount(), deConfiguration.getCreateTopic().getReplicationCount()); CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus(); switch (status) { case OK: if (componentsUtils != null) { - componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED); + componentsUtils + .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED); } break; case TOPIC_ALREADY_EXIST: if (componentsUtils != null) { - componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); + componentsUtils + .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS); } break; default: if (componentsUtils != null) { - componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED); + componentsUtils + .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED); } BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); isSucceeded = false; @@ -283,10 +262,6 @@ public class DistributionEngineInitTask implements Runnable { return isSucceeded; } - public static String buildTopicName(String topicName, String environment) { - return topicName + "-" + environment.toUpperCase(); - } - public boolean isActive() { return this.status.get(); } |