diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java | 61 |
1 files changed, 33 insertions, 28 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 19222377..6ac104ca 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -78,7 +78,7 @@ public class RicSynchronizationTask { } public void run(Ric ric) { - logger.debug("Handling ric: {}", ric.getConfig().ricId()); + logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId()); if (ric.getState() == RicState.SYNCHRONIZING) { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); @@ -86,22 +86,8 @@ public class RicSynchronizationTask { } ric.getLock().lock(LockType.EXCLUSIVE) // - .flatMap(notUsed -> setRicState(ric)) // - .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // - .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) + .flatMap(notUsed -> synchronizeRic(ric)) // .subscribe(new BaseSubscriber<Object>() { - @Override - protected void hookOnError(Throwable throwable) { - logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), - throwable.getMessage()); - ric.setState(RicState.UNAVAILABLE); - } - - @Override - protected void hookOnComplete() { - onSynchronizationComplete(ric); - } @Override protected void hookFinally(SignalType type) { @@ -110,6 +96,31 @@ public class RicSynchronizationTask { }); } + public Mono<Ric> synchronizeRic(Ric ric) { + return Mono.just(ric) // + .flatMap(notUsed -> setRicState(ric)) // + .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // + .flatMapMany(client -> runSynchronization(ric, client)) // + .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // + .collectList() // + .flatMap(notUsed -> Mono.just(ric)) // + .doOnError(t -> { // + logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // + ric.setState(RicState.UNAVAILABLE); // + }) // + .doOnNext(notUsed -> onSynchronizationComplete(ric)) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { + return a1Client.getPolicyTypeIdentities() // + .doOnNext(x -> ric.clearSupportedPolicyTypes()) // + .flatMapMany(Flux::fromIterable) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // + .doOnNext(ric::addSupportedPolicyType); // + } + @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields private Mono<Ric> setRicState(Ric ric) { synchronized (ric) { @@ -117,6 +128,7 @@ public class RicSynchronizationTask { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); return Mono.empty(); } + logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId()); ric.setState(RicState.SYNCHRONIZING); return Mono.just(ric); } @@ -141,7 +153,7 @@ public class RicSynchronizationTask { } private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); + logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); deleteAllPoliciesInRepository(ric); Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // @@ -158,15 +170,6 @@ public class RicSynchronizationTask { callbacks.notifyServicesRicSynchronized(ric, services); } - private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { - return a1Client.getPolicyTypeIdentities() // - .doOnNext(x -> ric.clearSupportedPolicyTypes()) // - .flatMapMany(Flux::fromIterable) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // - .doOnNext(ric::addSupportedPolicyType); // - } - private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -188,7 +191,7 @@ public class RicSynchronizationTask { } private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) { - logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); + logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); return a1Client.putPolicy(policy) // .flatMapMany(notUsed -> Flux.just(policy)); } @@ -202,8 +205,10 @@ public class RicSynchronizationTask { private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.id())) // + .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) // .filter(policy -> !checkTransient(policy)) // - .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC) + .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage())); } } |