diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java | 86 |
1 files changed, 40 insertions, 46 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 8926ec16..e3edaf44 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -64,11 +64,11 @@ public class RicSupervision { private final Services services; private final AsyncRestClientFactory restClientFactory; - private static class SynchStartedException extends ServiceException { + private static class SynchNeededException extends ServiceException { private static final long serialVersionUID = 1L; - public SynchStartedException(String message) { - super(message); + public SynchNeededException(RicData ric) { + super("SynchNeededException for " + ric.ric.id()); } } @@ -106,68 +106,61 @@ public class RicSupervision { createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed")); } - private Flux<RicData> createTask() { + private Flux<Ric> createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkOneRic, CONCURRENCY); + .onErrorResume(t -> Flux.empty()) // + .flatMap(this::checkOneRic, CONCURRENCY) // + .map(ricData -> ricData.ric); } private Mono<RicData> checkOneRic(RicData ricData) { - return checkRicState(ricData) // - .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic")) // - .flatMap(notUsed -> setRicState(ricData)) // + if (ricData.ric.getState() == RicState.CONSISTENCY_CHECK || ricData.ric.getState() == RicState.SYNCHRONIZING) { + logger.debug("Skipping check ric: {}, state: {}", ricData.ric.id(), ricData.ric.getState()); + return Mono.empty(); // Skip, already in progress + } + return ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic") // + .flatMap(lock -> synchIfUnavailable(ricData)) // + .doOnNext(ric -> ricData.ric.setState(RicState.CONSISTENCY_CHECK)) // .flatMap(x -> checkRicPolicies(ricData)) // .flatMap(x -> checkRicPolicyTypes(ricData)) // .doOnNext(x -> onRicCheckedOk(ricData)) // - .doOnError(t -> onRicCheckedError(t, ricData)) // + .onErrorResume(t -> onRicCheckedError(t, ricData)) // + .doFinally(sig -> ricData.ric.getLock().unlockBlocking()) // .onErrorResume(throwable -> Mono.empty()); } - private void onRicCheckedError(Throwable t, RicData ricData) { + private Mono<RicData> synchIfUnavailable(RicData ric) { + if (ric.ric.getState() == RicState.UNAVAILABLE) { + return Mono.error(new SynchNeededException(ric)); + } else { + return Mono.just(ric); + } + } + + private Mono<RicData> onRicCheckedError(Throwable t, RicData ricData) { logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage()); - if (!(t instanceof SynchStartedException)) { - // If synch is started, the synch will set the final state - ricData.ric.setState(RicState.UNAVAILABLE); + ricData.ric.setState(RicState.UNAVAILABLE); + if ((t instanceof SynchNeededException)) { + return startSynchronization(ricData); + } else { + logger.warn("RicSupervision, ric: {}, exception: {}", ricData.ric.id(), t.getMessage()); + return Mono.empty(); } - ricData.ric.getLock().unlockBlocking(); } private void onRicCheckedOk(RicData ricData) { logger.debug("Ric: {} checked OK", ricData.ric.id()); ricData.ric.setState(RicState.AVAILABLE); - ricData.ric.getLock().unlockBlocking(); - } - - @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields - private Mono<RicData> setRicState(RicData ric) { - synchronized (ric) { - if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) { - logger.debug("Ric: {} is already being checked", ric.ric.getConfig().ricId()); - return Mono.empty(); - } - ric.ric.setState(RicState.CONSISTENCY_CHECK); - return Mono.just(ric); - } } private Mono<RicData> createRicData(Ric ric) { - return Mono.just(ric) // - .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // + return this.a1ClientFactory.createA1Client(ric) // + .doOnError(t -> logger.debug("Could not create A1 client for ric: {}, reason: {}", ric.id(), + t.getMessage())) // .map(a1Client -> new RicData(ric, a1Client)); } - private Mono<RicData> checkRicState(RicData ric) { - if (ric.ric.getState() == RicState.UNAVAILABLE) { - logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id()); - return startSynchronization(ric) // - .onErrorResume(t -> Mono.empty()); - } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) { - return Mono.empty(); - } else { - return Mono.just(ric); - } - } - private Mono<RicData> checkRicPolicies(RicData ric) { return ric.getClient().getPolicyIdentities() // .flatMap(ricP -> validateInstances(ricP, ric)); @@ -178,14 +171,14 @@ public class RicSupervision { if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) { logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})", ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size()); - return startSynchronization(ric); + return Mono.error(new SynchNeededException(ric)); } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})", ric.ric.id(), policyId); - return startSynchronization(ric); + return Mono.error(new SynchNeededException(ric)); } } return Mono.just(ric); @@ -202,22 +195,23 @@ public class RicSupervision { logger.debug( "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})", ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size()); - return startSynchronization(ric); + return Mono.error(new SynchNeededException(ric)); } for (String typeName : ricTypes) { if (!ric.ric.isSupportingType(typeName)) { logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})", ric.ric.id(), typeName); - return startSynchronization(ric); + return Mono.error(new SynchNeededException(ric)); } } return Mono.just(ric); } private Mono<RicData> startSynchronization(RicData ric) { + logger.debug("RicSupervision, starting ric: {} synchronization, state: {}", ric.ric.id(), ric.ric.getState()); RicSynchronizationTask synchronizationTask = createSynchronizationTask(); return synchronizationTask.synchronizeRic(ric.ric) // - .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started"))); + .flatMap(notUsed -> Mono.just(ric)); } |