diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2022-05-11 13:10:40 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2022-08-17 12:53:13 +0200 |
commit | 97ace6245fb8b7238d2f7f871797ba03df2d435f (patch) | |
tree | 94f6f437cd0e62b192a23eabc58d95fc71f5b82b /a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java | |
parent | a3e382b49db0cbdee32396cf9c7028d9f9b4a231 (diff) |
NONRTRIC PMS, Cherry-pick the recent changes into Jakarta Release1.3.3
Sqasch of cherrypicked commits for the maintenance release.
Issue-ID: CCSDK-3742
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS added support for custom A1 adapters
Added support for added external A1-P adapter. This makes it possible to design and include
adapter to APIs for accessing of A1 policies (in a NearRT-RIC) without any changes in this
SW.
Issue-ID: CCSDK-3655
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS added support for custom A1 adapters
Updates of the json schema for configuration. Made it stricter and added the customAdapterClass prpoperty.
Issue-ID: CCSDK-3655
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS removalof usage of immutable
Issue-ID: CCSDK-3629
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS, Sporadic instability
Attempt to stablize the synch.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS, Sporadic instability
Attempt to stablize the synch.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS, Sporadic instability
Some further simplifications and added test.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
NONRTRIC PMS, updated SDNC rest interface
Update path and output-json for SDNC rest interface - A1 Kohn
Issue-ID: CCSDK-3193
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS, Bugfix
If the auth-token-file parameter in the file application.yaml is missing, it would not default to an empty file name.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
NONRTRIC PMS, updated certs
Updated certificate (which was expired).
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I34ffc932d855ba3b94cfff23dcb56f30780dbecc
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java | 91 |
1 files changed, 40 insertions, 51 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 6305abf0..b3afa7cd 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -36,8 +36,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; @@ -78,61 +78,69 @@ public class RicSynchronizationTask { } public void run(Ric ric) { - logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId()); + logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId()); if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); + logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId()); return; } ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // .flatMap(notUsed -> synchronizeRic(ric)) // - .subscribe(new BaseSubscriber<Object>() { - - @Override - protected void hookFinally(SignalType type) { - ric.getLock().unlockBlocking(); - } - }); + .doFinally(sig -> ric.getLock().unlockBlocking()) // + .subscribe(); } public Mono<Ric> synchronizeRic(Ric ric) { - return setRicState(ric) // - .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // + return this.a1ClientFactory.createA1Client(ric) // + .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // - .collectList() // - .map(notUsed -> ric) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // - ric.setState(RicState.UNAVAILABLE); // + deletePoliciesIfNotRecreatable(t, ric); }) // + .collectList() // .flatMap(notUsed -> onSynchronizationComplete(ric)) // - .onErrorResume(t -> Mono.just(ric)); + .onErrorResume(t -> Mono.just(ric)) // + .doFinally(signal -> onFinally(signal, ric)); + } + + private void onFinally(SignalType signal, Ric ric) { + if (ric.getState().equals(RicState.SYNCHRONIZING)) { + logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal); + ric.setState(RicState.UNAVAILABLE); // + } + } + + /** + * If a 4xx error is received, allpolicies are deleted. This is just to avoid + * cyclical receovery due to that the NearRT RIC cannot accept a previously + * policy. + */ + private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) { + if (throwable instanceof WebClientResponseException) { + WebClientResponseException responseException = (WebClientResponseException) throwable; + if (responseException.getStatusCode().is4xxClientError()) { + deleteAllPoliciesInRepository(ric); + } + } + } + + private void deleteAllPoliciesInRepository(Ric ric) { + for (Policy policy : policies.getForRic(ric.id())) { + this.policies.remove(policy); + } } public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { return a1Client.getPolicyTypeIdentities() // .doOnNext(x -> ric.clearSupportedPolicyTypes()) // .flatMapMany(Flux::fromIterable) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().getRicId(), typeId)) // .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // .doOnNext(ric::addSupportedPolicyType); // } - @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields - private Mono<Ric> setRicState(Ric ric) { - synchronized (ric) { - if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); - return Mono.empty(); - } - logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId()); - ric.setState(RicState.SYNCHRONIZING); - return Mono.just(ric); - } - } - private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) { Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client); Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(); @@ -154,19 +162,6 @@ public class RicSynchronizationTask { .map(list -> ric); } - private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); - deleteAllPoliciesInRepository(ric); - - Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client)); - Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(A1Client::deleteAllPolicies) // - .doOnComplete(() -> deleteAllPoliciesInRepository(ric)); - - return Flux.concat(synchronizedTypes, deletePoliciesInRic); - } - private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -181,14 +176,8 @@ public class RicSynchronizationTask { return pt; } - private void deleteAllPoliciesInRepository(Ric ric) { - for (Policy policy : policies.getForRic(ric.id())) { - this.policies.remove(policy); - } - } - private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) { - logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); + logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId()); return a1Client.putPolicy(policy) // .flatMapMany(notUsed -> Flux.just(policy)); } |