summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java106
1 files changed, 43 insertions, 63 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java
index 4aa6136994..594e0b9418 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/scheduledtasks/RecoveryThreadManager.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,11 +17,22 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.openecomp.sdc.be.components.scheduledtasks;
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
+import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
+
import com.google.common.annotations.VisibleForTesting;
import fj.data.Either;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine;
@@ -36,45 +47,28 @@ import org.openecomp.sdc.common.log.wrappers.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.commons.collections.CollectionUtils.isEmpty;
-import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
-
@Component("recoveryThreadManager")
public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
private static final Logger log = Logger.getLogger(RecoveryThreadManager.class);
@VisibleForTesting
FixEnvironmentTask task = new FixEnvironmentTask();
-
+ @VisibleForTesting
+ Integer allowedTimeBeforeStaleSec;
@Resource
private OperationalEnvironmentDao operationalEnvironmentDao;
-
@Autowired
private EnvironmentsEngine environmentsEngine;
-
- private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(NumberUtils.INTEGER_ONE,
- new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
- @VisibleForTesting
- Integer allowedTimeBeforeStaleSec;
+ private ScheduledExecutorService scheduledService = Executors
+ .newScheduledThreadPool(NumberUtils.INTEGER_ONE, new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
@PostConstruct
public void init() {
log.debug("Enter init method of RecoveryThreadManager");
- final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager
- .getConfigurationManager().getDistributionEngineConfiguration();
+ final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration();
Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec();
- scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec,
- TimeUnit.SECONDS);
+ scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec, TimeUnit.SECONDS);
this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec();
log.debug("End init method of AsdcComponentsCleaner");
}
@@ -84,29 +78,31 @@ public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
shutdownExecutor();
}
+ @Override
+ public ExecutorService getExecutorService() {
+ return scheduledService;
+ }
+
protected class FixEnvironmentTask implements Runnable {
+
@Override
public void run() {
try {
// Failed Envs
Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao
- .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
- eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords), convertToFunction(
- cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
-
+ .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
+ eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords),
+ convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
// In-Progress Envs
Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao
- .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
+ .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords),
- convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS,
- cassandraError)));
-
+ convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS, cassandraError)));
// Envs To Connect to UEB topics
Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao
- .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
- eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords), convertToFunction(
- cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
-
+ .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
+ eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords),
+ convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
} catch (Exception e) {
log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e);
}
@@ -114,49 +110,33 @@ public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) {
if (!isEmpty(completeEnvironmentsRecords)) {
- completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env))
- .forEach(opEnvEntry -> {
- environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
- environmentsEngine.addToMap(opEnvEntry);
- });
+ completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env)).forEach(opEnvEntry -> {
+ environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
+ environmentsEngine.addToMap(opEnvEntry);
+ });
}
-
}
private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) {
if (!isEmpty(failedEnvironmentsRecords)) {
- failedEnvironmentsRecords.parallelStream()
- .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
+ failedEnvironmentsRecords.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
}
-
}
private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) {
if (!isEmpty(inProgressEnvList)) {
-
long currentTimeMillis = System.currentTimeMillis();
if (!isEmpty(inProgressEnvList)) {
List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream()
- .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
- .collect(Collectors.toList());
- staleInProgressEnvList.parallelStream()
- .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
+ .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
+ .collect(Collectors.toList());
+ staleInProgressEnvList.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
}
-
}
-
}
private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) {
log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus);
}
-
-
}
-
- @Override
- public ExecutorService getExecutorService() {
- return scheduledService;
- }
-
}