aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java91
1 files changed, 40 insertions, 51 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 6305abf0..b3afa7cd 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -36,8 +36,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
-import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
@@ -78,61 +78,69 @@ public class RicSynchronizationTask {
}
public void run(Ric ric) {
- logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
+ logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId());
if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId());
return;
}
ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
.flatMap(notUsed -> synchronizeRic(ric)) //
- .subscribe(new BaseSubscriber<Object>() {
-
- @Override
- protected void hookFinally(SignalType type) {
- ric.getLock().unlockBlocking();
- }
- });
+ .doFinally(sig -> ric.getLock().unlockBlocking()) //
+ .subscribe();
}
public Mono<Ric> synchronizeRic(Ric ric) {
- return setRicState(ric) //
- .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ return this.a1ClientFactory.createA1Client(ric) //
+ .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
- .collectList() //
- .map(notUsed -> ric) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
- ric.setState(RicState.UNAVAILABLE); //
+ deletePoliciesIfNotRecreatable(t, ric);
}) //
+ .collectList() //
.flatMap(notUsed -> onSynchronizationComplete(ric)) //
- .onErrorResume(t -> Mono.just(ric));
+ .onErrorResume(t -> Mono.just(ric)) //
+ .doFinally(signal -> onFinally(signal, ric));
+ }
+
+ private void onFinally(SignalType signal, Ric ric) {
+ if (ric.getState().equals(RicState.SYNCHRONIZING)) {
+ logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal);
+ ric.setState(RicState.UNAVAILABLE); //
+ }
+ }
+
+ /**
+ * If a 4xx error is received, allpolicies are deleted. This is just to avoid
+ * cyclical receovery due to that the NearRT RIC cannot accept a previously
+ * policy.
+ */
+ private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) {
+ if (throwable instanceof WebClientResponseException) {
+ WebClientResponseException responseException = (WebClientResponseException) throwable;
+ if (responseException.getStatusCode().is4xxClientError()) {
+ deleteAllPoliciesInRepository(ric);
+ }
+ }
+ }
+
+ private void deleteAllPoliciesInRepository(Ric ric) {
+ for (Policy policy : policies.getForRic(ric.id())) {
+ this.policies.remove(policy);
+ }
}
public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
return a1Client.getPolicyTypeIdentities() //
.doOnNext(x -> ric.clearSupportedPolicyTypes()) //
.flatMapMany(Flux::fromIterable) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().getRicId(), typeId)) //
.flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
.doOnNext(ric::addSupportedPolicyType); //
}
- @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
- private Mono<Ric> setRicState(Ric ric) {
- synchronized (ric) {
- if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
- return Mono.empty();
- }
- logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
- ric.setState(RicState.SYNCHRONIZING);
- return Mono.just(ric);
- }
- }
-
private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
@@ -154,19 +162,6 @@ public class RicSynchronizationTask {
.map(list -> ric);
}
- private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
- deleteAllPoliciesInRepository(ric);
-
- Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
- Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(A1Client::deleteAllPolicies) //
- .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
-
- return Flux.concat(synchronizedTypes, deletePoliciesInRic);
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
@@ -181,14 +176,8 @@ public class RicSynchronizationTask {
return pt;
}
- private void deleteAllPoliciesInRepository(Ric ric) {
- for (Policy policy : policies.getForRic(ric.id())) {
- this.policies.remove(policy);
- }
- }
-
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
- logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+ logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId());
return a1Client.putPolicy(policy) //
.flatMapMany(notUsed -> Flux.just(policy));
}