summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2021-04-14 20:16:35 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2021-04-20 15:57:00 +0200
commitb28e811178bf9d828615f62c67f30a78c0414eb1 (patch)
treea7ad6e4fe739f8369d73ece11e80c669395e4b15 /a1-policy-management/src/main
parentab7baa0563069bc403c840b39f22a9e7e900fb72 (diff)
PMS Persistent storage of policies and type definitions - A1 Istanbul
Bugfix,improved traces, avoiding synch for RICs after restart. Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e Issue-ID: CCSDK-3256 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management/src/main')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java13
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java70
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java24
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java61
8 files changed, 113 insertions, 67 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
index 5bfe677e..170ade6f 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
@@ -136,8 +136,8 @@ public class ApplicationConfig {
@Getter
private final Type type;
- RicConfigUpdate(RicConfig ric, Type event) {
- this.ricConfig = ric;
+ public RicConfigUpdate(RicConfig config, Type event) {
+ this.ricConfig = config;
this.type = event;
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
index 4e42550c..2edd1e51 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
@@ -141,9 +141,9 @@ public class ServiceController {
}
}
- @Operation(summary = "Unregisters a service")
+ @Operation(summary = "Unregister a service")
@ApiResponses(value = { //
- @ApiResponse(responseCode = "204", description = "Service unregisterred", //
+ @ApiResponse(responseCode = "204", description = "Service unregistered", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))),
@ApiResponse(responseCode = "404", description = "Service not found", //
content = @Content(schema = @Schema(implementation = String.class)))})
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
index a24c5bd0..45aa57e4 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
@@ -25,7 +25,6 @@ import com.google.gson.GsonBuilder;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
@@ -49,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.FileSystemUtils;
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@Configuration
public class Policies {
@@ -132,7 +132,7 @@ public class Policies {
if (!policy.isTransient()) {
try {
Files.delete(getPath(policy));
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.debug("Could not delete policy from database: {}", e.getMessage());
}
}
@@ -162,7 +162,7 @@ public class Policies {
if (this.appConfig.getVardataDirectory() != null) {
FileSystemUtils.deleteRecursively(getDatabasePath());
}
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.warn("Could not delete policy database : {}", e.getMessage());
}
}
@@ -186,8 +186,7 @@ public class Policies {
return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json");
}
- public void restoreFromDatabase(Ric ric, PolicyTypes types) {
-
+ public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) {
try {
Files.createDirectories(getDatabasePath(ric));
for (File file : getDatabasePath(ric).toFile().listFiles()) {
@@ -195,7 +194,9 @@ public class Policies {
PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class);
this.put(toPolicy(policyStorage, ric, types));
}
- } catch (ServiceException | IOException e) {
+ logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
+ this.policiesRic.get(ric.id()).size());
+ } catch (Exception e) {
logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
index 76f0e216..ad3270cb 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
@@ -119,7 +119,7 @@ public class PolicyTypes {
PolicyType type = gson.fromJson(json, PolicyType.class);
this.types.put(type.getId(), type);
}
-
+ logger.debug("Restored type database,no of types: {}", this.types.size());
} catch (IOException e) {
logger.warn("Could not restore policy type database : {}", e.getMessage());
} catch (ServiceException e) {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
index 9c4b2750..f9af2393 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
@@ -78,7 +78,7 @@ public class Ric {
* @return a vector containing the nodes managed by this Ric.
*/
public synchronized Collection<String> getManagedElementIds() {
- return ricConfig.managedElementIds();
+ return new Vector<>(ricConfig.managedElementIds());
}
/**
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 771dea52..c733cb0d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
@@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable;
* configuration file.
*/
@Component
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class RefreshConfigTask {
private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
@@ -119,14 +120,14 @@ public class RefreshConfigTask {
}
Flux<RicConfigUpdate.Type> createRefreshTask() {
- Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromFile = regularInterval() //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreErrorFlux) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromConsul = regularInterval() //
.flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
.flatMap(this::getFromCbs) //
@@ -135,14 +136,21 @@ public class RefreshConfigTask {
.doOnNext(json -> this.isConsulUsed = true) //
.doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+ final int CONCURRENCY = 50; // Number of RIC synched in paralell
+
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
- .flatMap(this::updateConfig) //
- .doOnNext(this::handleUpdatedRicConfig) //
- .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+ .flatMap(this::updateConfig, CONCURRENCY) //
+ .flatMap(this::handleUpdatedRicConfig) //
.doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
+ private Flux<Long> regularInterval() {
+ return Flux.interval(Duration.ZERO, configRefreshInterval) //
+ .onBackpressureDrop() //
+ .limitRate(1); // Limit so that only one event is emitted at a time
+ }
+
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
.onErrorResume(t -> Mono.empty());
@@ -192,19 +200,43 @@ public class RefreshConfigTask {
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services,
- restClientFactory, rics);
- synch.run(ric);
+ synchronizationTask().run(ric);
}
}
- private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ private RicSynchronizationTask synchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
+ }
+
+ /**
+ * for an added RIC after a restart it is nesessary to get the suypported policy
+ * types from the RIC unless a full synchronization is wanted.
+ *
+ * @param ric the ric to get supprted types from
+ * @return the same ric
+ */
+ private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
+ logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
+ // Synchronize the policy types
+ return this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric)) //
+ .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(),
+ t.getMessage())) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().ricId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
- addRic(updatedInfo.getRicConfig());
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ return trySyncronizeSupportedTypes(ric) //
+ .flatMap(this::addRic) //
+ .flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -215,27 +247,25 @@ public class RefreshConfigTask {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(updatedInfo.getRicConfig());
+ addRic(new Ric(updatedInfo.getRicConfig())).block();
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
+ return Mono.just(event);
}
}
- private void addRic(RicConfig config) {
- Ric ric = new Ric(config);
+ Mono<Ric> addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
- runRicSynchronization(ric);
- }
+ logger.debug("Added RIC: {}", ric.id());
+
+ ric.setState(RicState.AVAILABLE);
- void runRicSynchronization(Ric ric) {
- RicSynchronizationTask synchronizationTask =
- new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
- synchronizationTask.run(ric);
+ return Mono.just(ric);
}
/**
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index c13df8c3..87689012 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -55,6 +55,7 @@ import reactor.core.publisher.Mono;
public class RicSupervision {
private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
+ private static final int CONCURRENCY = 50; // Number of RIC checked in paralell
private final Rics rics;
private final Policies policies;
private final PolicyTypes policyTypes;
@@ -107,7 +108,7 @@ public class RicSupervision {
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkOneRic);
+ .flatMap(this::checkOneRic, CONCURRENCY);
}
private Mono<RicData> checkOneRic(RicData ricData) {
@@ -123,10 +124,8 @@ public class RicSupervision {
private void onRicCheckedError(Throwable t, RicData ricData) {
logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
- if (t instanceof SynchStartedException) {
- // this is just a temporary state,
- ricData.ric.setState(RicState.AVAILABLE);
- } else {
+ if (!(t instanceof SynchStartedException)) {
+ // If synch is started, the synch will set the final state
ricData.ric.setState(RicState.UNAVAILABLE);
}
ricData.ric.getLock().unlockBlocking();
@@ -158,6 +157,7 @@ public class RicSupervision {
private Mono<RicData> checkRicState(RicData ric) {
if (ric.ric.getState() == RicState.UNAVAILABLE) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id());
return startSynchronization(ric) //
.onErrorResume(t -> Mono.empty());
} else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
@@ -175,11 +175,15 @@ public class RicSupervision {
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})",
+ ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size());
return startSynchronization(ric);
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})",
+ ric.ric.id(), policyId);
return startSynchronization(ric);
}
}
@@ -194,10 +198,15 @@ public class RicSupervision {
private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
+ logger.debug(
+ "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})",
+ ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size());
return startSynchronization(ric);
}
for (String typeName : ricTypes) {
if (!ric.ric.isSupportingType(typeName)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})",
+ ric.ric.id(), typeName);
return startSynchronization(ric);
}
}
@@ -206,8 +215,9 @@ public class RicSupervision {
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask synchronizationTask = createSynchronizationTask();
- synchronizationTask.run(ric.ric);
- return Mono.error(new SynchStartedException("Syncronization started"));
+ return synchronizationTask.synchronizeRic(ric.ric) //
+ .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started")));
+
}
RicSynchronizationTask createSynchronizationTask() {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 19222377..6ac104ca 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -78,7 +78,7 @@ public class RicSynchronizationTask {
}
public void run(Ric ric) {
- logger.debug("Handling ric: {}", ric.getConfig().ricId());
+ logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
if (ric.getState() == RicState.SYNCHRONIZING) {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
@@ -86,22 +86,8 @@ public class RicSynchronizationTask {
}
ric.getLock().lock(LockType.EXCLUSIVE) //
- .flatMap(notUsed -> setRicState(ric)) //
- .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .flatMap(notUsed -> synchronizeRic(ric)) //
.subscribe(new BaseSubscriber<Object>() {
- @Override
- protected void hookOnError(Throwable throwable) {
- logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(),
- throwable.getMessage());
- ric.setState(RicState.UNAVAILABLE);
- }
-
- @Override
- protected void hookOnComplete() {
- onSynchronizationComplete(ric);
- }
@Override
protected void hookFinally(SignalType type) {
@@ -110,6 +96,31 @@ public class RicSynchronizationTask {
});
}
+ public Mono<Ric> synchronizeRic(Ric ric) {
+ return Mono.just(ric) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
+ .collectList() //
+ .flatMap(notUsed -> Mono.just(ric)) //
+ .doOnError(t -> { //
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
+ ric.setState(RicState.UNAVAILABLE); //
+ }) //
+ .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+ return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+ .doOnNext(ric::addSupportedPolicyType); //
+ }
+
@SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
private Mono<Ric> setRicState(Ric ric) {
synchronized (ric) {
@@ -117,6 +128,7 @@ public class RicSynchronizationTask {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
return Mono.empty();
}
+ logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
ric.setState(RicState.SYNCHRONIZING);
return Mono.just(ric);
}
@@ -141,7 +153,7 @@ public class RicSynchronizationTask {
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -158,15 +170,6 @@ public class RicSynchronizationTask {
callbacks.notifyServicesRicSynchronized(ric, services);
}
- private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyTypeIdentities() //
- .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
- .doOnNext(ric::addSupportedPolicyType); //
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
@@ -188,7 +191,7 @@ public class RicSynchronizationTask {
}
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
- logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+ logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
return a1Client.putPolicy(policy) //
.flatMapMany(notUsed -> Flux.just(policy));
}
@@ -202,8 +205,10 @@ public class RicSynchronizationTask {
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.id())) //
+ .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) //
.filter(policy -> !checkTransient(policy)) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC)
+ .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage()));
}
}