diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java | 70 |
1 files changed, 50 insertions, 20 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 771dea52..c733cb0d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; @@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable; * configuration file. */ @Component +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class RefreshConfigTask { private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class); @@ -119,14 +120,14 @@ public class RefreshConfigTask { } Flux<RicConfigUpdate.Type> createRefreshTask() { - Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux<JsonObject> loadFromFile = regularInterval() // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux<JsonObject> loadFromConsul = regularInterval() // .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // .flatMap(this::getFromCbs) // @@ -135,14 +136,21 @@ public class RefreshConfigTask { .doOnNext(json -> this.isConsulUsed = true) // .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); + final int CONCURRENCY = 50; // Number of RIC synched in paralell + return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // - .flatMap(this::updateConfig) // - .doOnNext(this::handleUpdatedRicConfig) // - .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // + .flatMap(this::updateConfig, CONCURRENCY) // + .flatMap(this::handleUpdatedRicConfig) // .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } + private Flux<Long> regularInterval() { + return Flux.interval(Duration.ZERO, configRefreshInterval) // + .onBackpressureDrop() // + .limitRate(1); // Limit so that only one event is emitted at a time + } + Mono<EnvProperties> getEnvironment(Properties systemEnvironment) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // .onErrorResume(t -> Mono.empty()); @@ -192,19 +200,43 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, - restClientFactory, rics); - synch.run(ric); + synchronizationTask().run(ric); } } - private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { + private RicSynchronizationTask synchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); + } + + /** + * for an added RIC after a restart it is nesessary to get the suypported policy + * types from the RIC unless a full synchronization is wanted. + * + * @param ric the ric to get supprted types from + * @return the same ric + */ + private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) { + logger.debug("Synchronizing policy types for new RIC: {}", ric.id()); + // Synchronize the policy types + return this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // + .collectList() // + .flatMap(list -> Mono.just(ric)) // + .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), + t.getMessage())) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { synchronized (this.rics) { String ricId = updatedInfo.getRicConfig().ricId(); RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - addRic(updatedInfo.getRicConfig()); + Ric ric = new Ric(updatedInfo.getRicConfig()); + return trySyncronizeSupportedTypes(ric) // + .flatMap(this::addRic) // + .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -215,27 +247,25 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(updatedInfo.getRicConfig()); + addRic(new Ric(updatedInfo.getRicConfig())).block(); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } } + return Mono.just(event); } } - private void addRic(RicConfig config) { - Ric ric = new Ric(config); + Mono<Ric> addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { this.policies.restoreFromDatabase(ric, this.policyTypes); } - runRicSynchronization(ric); - } + logger.debug("Added RIC: {}", ric.id()); + + ric.setState(RicState.AVAILABLE); - void runRicSynchronization(Ric ric) { - RicSynchronizationTask synchronizationTask = - new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); - synchronizationTask.run(ric); + return Mono.just(ric); } /** |