diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java | 106 |
1 files changed, 43 insertions, 63 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java index 4aa6136994..594e0b9418 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,11 +17,22 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.be.components.scheduledtasks; +import static org.apache.commons.collections.CollectionUtils.isEmpty; +import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction; + import com.google.common.annotations.VisibleForTesting; import fj.data.Either; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine; @@ -36,45 +47,28 @@ import org.openecomp.sdc.common.log.wrappers.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.annotation.Resource; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.commons.collections.CollectionUtils.isEmpty; -import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction; - @Component("recoveryThreadManager") public class RecoveryThreadManager extends AbstractScheduleTaskRunner { private static final Logger log = Logger.getLogger(RecoveryThreadManager.class); @VisibleForTesting FixEnvironmentTask task = new FixEnvironmentTask(); - + @VisibleForTesting + Integer allowedTimeBeforeStaleSec; @Resource private OperationalEnvironmentDao operationalEnvironmentDao; - @Autowired private EnvironmentsEngine environmentsEngine; - - private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(NumberUtils.INTEGER_ONE, - new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build()); - @VisibleForTesting - Integer allowedTimeBeforeStaleSec; + private ScheduledExecutorService scheduledService = Executors + .newScheduledThreadPool(NumberUtils.INTEGER_ONE, new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build()); @PostConstruct public void init() { log.debug("Enter init method of RecoveryThreadManager"); - final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager - .getConfigurationManager().getDistributionEngineConfiguration(); + final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager() + .getDistributionEngineConfiguration(); Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec(); - scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec, - TimeUnit.SECONDS); + scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec, TimeUnit.SECONDS); this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec(); log.debug("End init method of AsdcComponentsCleaner"); } @@ -84,29 +78,31 @@ public class RecoveryThreadManager extends AbstractScheduleTaskRunner { shutdownExecutor(); } + @Override + public ExecutorService getExecutorService() { + return scheduledService; + } + protected class FixEnvironmentTask implements Runnable { + @Override public void run() { try { // Failed Envs Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao - .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED); - eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords), convertToFunction( - cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError))); - + .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED); + eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords), + convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError))); // In-Progress Envs Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao - .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS); + .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS); eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords), - convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS, - cassandraError))); - + convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS, cassandraError))); // Envs To Connect to UEB topics Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao - .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED); - eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords), convertToFunction( - cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError))); - + .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED); + eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords), + convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError))); } catch (Exception e) { log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e); } @@ -114,49 +110,33 @@ public class RecoveryThreadManager extends AbstractScheduleTaskRunner { private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) { if (!isEmpty(completeEnvironmentsRecords)) { - completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env)) - .forEach(opEnvEntry -> { - environmentsEngine.createUebTopicsForEnvironment(opEnvEntry); - environmentsEngine.addToMap(opEnvEntry); - }); + completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env)).forEach(opEnvEntry -> { + environmentsEngine.createUebTopicsForEnvironment(opEnvEntry); + environmentsEngine.addToMap(opEnvEntry); + }); } - } private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) { if (!isEmpty(failedEnvironmentsRecords)) { - failedEnvironmentsRecords.parallelStream() - .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env)); + failedEnvironmentsRecords.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env)); } - } private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) { if (!isEmpty(inProgressEnvList)) { - long currentTimeMillis = System.currentTimeMillis(); if (!isEmpty(inProgressEnvList)) { List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream() - .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis) - .collect(Collectors.toList()); - staleInProgressEnvList.parallelStream() - .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env)); + .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis) + .collect(Collectors.toList()); + staleInProgressEnvList.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env)); } - } - } private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) { log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus); } - - } - - @Override - public ExecutorService getExecutorService() { - return scheduledService; - } - } |