aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java70
1 files changed, 50 insertions, 20 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 771dea52..c733cb0d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
@@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable;
* configuration file.
*/
@Component
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class RefreshConfigTask {
private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
@@ -119,14 +120,14 @@ public class RefreshConfigTask {
}
Flux<RicConfigUpdate.Type> createRefreshTask() {
- Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromFile = regularInterval() //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreErrorFlux) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromConsul = regularInterval() //
.flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
.flatMap(this::getFromCbs) //
@@ -135,14 +136,21 @@ public class RefreshConfigTask {
.doOnNext(json -> this.isConsulUsed = true) //
.doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+ final int CONCURRENCY = 50; // Number of RIC synched in paralell
+
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
- .flatMap(this::updateConfig) //
- .doOnNext(this::handleUpdatedRicConfig) //
- .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+ .flatMap(this::updateConfig, CONCURRENCY) //
+ .flatMap(this::handleUpdatedRicConfig) //
.doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
+ private Flux<Long> regularInterval() {
+ return Flux.interval(Duration.ZERO, configRefreshInterval) //
+ .onBackpressureDrop() //
+ .limitRate(1); // Limit so that only one event is emitted at a time
+ }
+
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
.onErrorResume(t -> Mono.empty());
@@ -192,19 +200,43 @@ public class RefreshConfigTask {
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services,
- restClientFactory, rics);
- synch.run(ric);
+ synchronizationTask().run(ric);
}
}
- private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ private RicSynchronizationTask synchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
+ }
+
+ /**
+ * for an added RIC after a restart it is nesessary to get the suypported policy
+ * types from the RIC unless a full synchronization is wanted.
+ *
+ * @param ric the ric to get supprted types from
+ * @return the same ric
+ */
+ private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
+ logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
+ // Synchronize the policy types
+ return this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric)) //
+ .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(),
+ t.getMessage())) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().ricId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
- addRic(updatedInfo.getRicConfig());
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ return trySyncronizeSupportedTypes(ric) //
+ .flatMap(this::addRic) //
+ .flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -215,27 +247,25 @@ public class RefreshConfigTask {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(updatedInfo.getRicConfig());
+ addRic(new Ric(updatedInfo.getRicConfig())).block();
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
+ return Mono.just(event);
}
}
- private void addRic(RicConfig config) {
- Ric ric = new Ric(config);
+ Mono<Ric> addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
- runRicSynchronization(ric);
- }
+ logger.debug("Added RIC: {}", ric.id());
+
+ ric.setState(RicState.AVAILABLE);
- void runRicSynchronization(Ric ric) {
- RicSynchronizationTask synchronizationTask =
- new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
- synchronizationTask.run(ric);
+ return Mono.just(ric);
}
/**