diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 94e873ed..2d282f33 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -93,17 +94,36 @@ public class RicSynchronizationTask { return this.a1ClientFactory.createA1Client(ric) // .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // - .collectList() // - .map(notUsed -> ric) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // ric.setState(RicState.UNAVAILABLE); // + deletePoliciesIfNotRecreatable(t, ric); }) // + .collectList() // .flatMap(notUsed -> onSynchronizationComplete(ric)) // .onErrorResume(t -> Mono.just(ric)); } + /** + * If a 4xx error is received, allpolicies are deleted. This is just to avoid + * cyclical receovery due to that the NearRT RIC cannot accept a previously + * policy. + */ + private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) { + if (throwable instanceof WebClientResponseException) { + WebClientResponseException responseException = (WebClientResponseException) throwable; + if (responseException.getStatusCode().is4xxClientError()) { + deleteAllPoliciesInRepository(ric); + } + } + } + + private void deleteAllPoliciesInRepository(Ric ric) { + for (Policy policy : policies.getForRic(ric.id())) { + this.policies.remove(policy); + } + } + public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { return a1Client.getPolicyTypeIdentities() // .doOnNext(x -> ric.clearSupportedPolicyTypes()) // @@ -134,19 +154,6 @@ public class RicSynchronizationTask { .map(list -> ric); } - private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); - deleteAllPoliciesInRepository(ric); - - Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client)); - Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(A1Client::deleteAllPolicies) // - .doOnComplete(() -> deleteAllPoliciesInRepository(ric)); - - return Flux.concat(synchronizedTypes, deletePoliciesInRic); - } - private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -161,12 +168,6 @@ public class RicSynchronizationTask { return pt; } - private void deleteAllPoliciesInRepository(Ric ric) { - for (Policy policy : policies.getForRic(ric.id())) { - this.policies.remove(policy); - } - } - private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) { logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId()); return a1Client.putPolicy(policy) // |