summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-06-10 12:43:06 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2022-06-10 13:11:04 +0200
commit87b1ff527a8110f592b4b5e2fcf13b8cd43b6d1f (patch)
treef375243d253097b84250550b13ee7558936087e9 /a1-policy-management/src/main/java
parent779509036cdadb5735d336ed93a3bfac5a0be72a (diff)
NONRTRIC PMS, Sporadic instability
Some further simplifications and added test. Issue-ID: CCSDK-3683 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech> Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
Diffstat (limited to 'a1-policy-management/src/main/java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java47
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java5
2 files changed, 8 insertions, 44 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index fea242ae..983e92e6 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -36,11 +36,9 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
@@ -130,7 +128,7 @@ public class RefreshConfigTask {
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig, CONCURRENCY) //
.flatMap(this::handleUpdatedRicConfig) //
- .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+ .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal));
}
private Flux<Long> regularInterval() {
@@ -170,40 +168,16 @@ public class RefreshConfigTask {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
}
- /**
- * for an added RIC after a restart it is nesessary to get the suypported policy
- * types from the RIC unless a full synchronization is wanted.
- *
- * @param ric the ric to get supprted types from
- * @return the same ric
- */
- private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
- logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
- // Synchronize the policy types
- ric.setState(RicState.SYNCHRONIZING);
- return this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
- .collectList() //
- .map(list -> ric) //
- .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
- .doOnError(t -> {
- logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
- ric.setState(RicState.UNAVAILABLE); //
- }) //
- .onErrorResume(t -> Mono.just(ric));
- }
-
public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().getRicId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
-
- return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
- .doOnNext(this::addRic) //
- .flatMap(this::notifyServicesRicAvailable) //
- .flatMap(notUsed -> Mono.just(event));
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ this.addRic(ric);
+ return this.synchronizationTask().synchronizeRic(ric) //
+ .map(notUsed -> event);
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -231,17 +205,6 @@ public class RefreshConfigTask {
logger.debug("Added RIC: {}", ric.id());
}
- private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
- if (ric.getState() == RicState.AVAILABLE) {
- ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
- return callbacks.notifyServicesRicAvailable(ric, services) //
- .collectList() //
- .map(list -> ric);
- } else {
- return Mono.just(ric);
- }
- }
-
/**
* Reads the configuration from file.
*/
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index 177778b8..e3edaf44 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -106,11 +106,12 @@ public class RicSupervision {
createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
}
- private Flux<RicData> createTask() {
+ private Flux<Ric> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.onErrorResume(t -> Flux.empty()) //
- .flatMap(this::checkOneRic, CONCURRENCY);
+ .flatMap(this::checkOneRic, CONCURRENCY) //
+ .map(ricData -> ricData.ric);
}
private Mono<RicData> checkOneRic(RicData ricData) {