aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java61
1 files changed, 33 insertions, 28 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 19222377..6ac104ca 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -78,7 +78,7 @@ public class RicSynchronizationTask {
}
public void run(Ric ric) {
- logger.debug("Handling ric: {}", ric.getConfig().ricId());
+ logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
if (ric.getState() == RicState.SYNCHRONIZING) {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
@@ -86,22 +86,8 @@ public class RicSynchronizationTask {
}
ric.getLock().lock(LockType.EXCLUSIVE) //
- .flatMap(notUsed -> setRicState(ric)) //
- .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .flatMap(notUsed -> synchronizeRic(ric)) //
.subscribe(new BaseSubscriber<Object>() {
- @Override
- protected void hookOnError(Throwable throwable) {
- logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(),
- throwable.getMessage());
- ric.setState(RicState.UNAVAILABLE);
- }
-
- @Override
- protected void hookOnComplete() {
- onSynchronizationComplete(ric);
- }
@Override
protected void hookFinally(SignalType type) {
@@ -110,6 +96,31 @@ public class RicSynchronizationTask {
});
}
+ public Mono<Ric> synchronizeRic(Ric ric) {
+ return Mono.just(ric) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
+ .collectList() //
+ .flatMap(notUsed -> Mono.just(ric)) //
+ .doOnError(t -> { //
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
+ ric.setState(RicState.UNAVAILABLE); //
+ }) //
+ .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+ return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+ .doOnNext(ric::addSupportedPolicyType); //
+ }
+
@SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
private Mono<Ric> setRicState(Ric ric) {
synchronized (ric) {
@@ -117,6 +128,7 @@ public class RicSynchronizationTask {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
return Mono.empty();
}
+ logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
ric.setState(RicState.SYNCHRONIZING);
return Mono.just(ric);
}
@@ -141,7 +153,7 @@ public class RicSynchronizationTask {
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -158,15 +170,6 @@ public class RicSynchronizationTask {
callbacks.notifyServicesRicSynchronized(ric, services);
}
- private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyTypeIdentities() //
- .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
- .doOnNext(ric::addSupportedPolicyType); //
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
@@ -188,7 +191,7 @@ public class RicSynchronizationTask {
}
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
- logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+ logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
return a1Client.putPolicy(policy) //
.flatMapMany(notUsed -> Flux.just(policy));
}
@@ -202,8 +205,10 @@ public class RicSynchronizationTask {
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.id())) //
+ .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) //
.filter(policy -> !checkTransient(policy)) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC)
+ .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage()));
}
}