diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2022-06-10 12:43:06 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2022-06-10 13:11:04 +0200 |
commit | 87b1ff527a8110f592b4b5e2fcf13b8cd43b6d1f (patch) | |
tree | f375243d253097b84250550b13ee7558936087e9 /a1-policy-management/src/main | |
parent | 779509036cdadb5735d336ed93a3bfac5a0be72a (diff) |
NONRTRIC PMS, Sporadic instability
Some further simplifications and added test.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
Diffstat (limited to 'a1-policy-management/src/main')
2 files changed, 8 insertions, 44 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index fea242ae..983e92e6 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -36,11 +36,9 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; -import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; -import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; @@ -130,7 +128,7 @@ public class RefreshConfigTask { .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig, CONCURRENCY) // .flatMap(this::handleUpdatedRicConfig) // - .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); + .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal)); } private Flux<Long> regularInterval() { @@ -170,40 +168,16 @@ public class RefreshConfigTask { return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); } - /** - * for an added RIC after a restart it is nesessary to get the suypported policy - * types from the RIC unless a full synchronization is wanted. - * - * @param ric the ric to get supprted types from - * @return the same ric - */ - private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) { - logger.debug("Synchronizing policy types for new RIC: {}", ric.id()); - // Synchronize the policy types - ric.setState(RicState.SYNCHRONIZING); - return this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // - .collectList() // - .map(list -> ric) // - .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) // - .doOnError(t -> { - logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage()); - ric.setState(RicState.UNAVAILABLE); // - }) // - .onErrorResume(t -> Mono.just(ric)); - } - public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { synchronized (this.rics) { String ricId = updatedInfo.getRicConfig().getRicId(); RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - - return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) // - .doOnNext(this::addRic) // - .flatMap(this::notifyServicesRicAvailable) // - .flatMap(notUsed -> Mono.just(event)); + Ric ric = new Ric(updatedInfo.getRicConfig()); + this.addRic(ric); + return this.synchronizationTask().synchronizeRic(ric) // + .map(notUsed -> event); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -231,17 +205,6 @@ public class RefreshConfigTask { logger.debug("Added RIC: {}", ric.id()); } - private Mono<Ric> notifyServicesRicAvailable(Ric ric) { - if (ric.getState() == RicState.AVAILABLE) { - ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); - return callbacks.notifyServicesRicAvailable(ric, services) // - .collectList() // - .map(list -> ric); - } else { - return Mono.just(ric); - } - } - /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 177778b8..e3edaf44 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -106,11 +106,12 @@ public class RicSupervision { createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed")); } - private Flux<RicData> createTask() { + private Flux<Ric> createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // .onErrorResume(t -> Flux.empty()) // - .flatMap(this::checkOneRic, CONCURRENCY); + .flatMap(this::checkOneRic, CONCURRENCY) // + .map(ricData -> ricData.ric); } private Mono<RicData> checkOneRic(RicData ricData) { |