From b28e811178bf9d828615f62c67f30a78c0414eb1 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 14 Apr 2021 20:16:35 +0200 Subject: PMS Persistent storage of policies and type definitions - A1 Istanbul Bugfix,improved traces, avoiding synch for RICs after restart. Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e Issue-ID: CCSDK-3256 Signed-off-by: PatrikBuhr --- .../configuration/ApplicationConfig.java | 4 +- .../controllers/v1/ServiceController.java | 4 +- .../repository/Policies.java | 13 ++-- .../repository/PolicyTypes.java | 2 +- .../a1policymanagementservice/repository/Ric.java | 2 +- .../tasks/RefreshConfigTask.java | 70 +++++++++++++++------- .../tasks/RicSupervision.java | 24 +++++--- .../tasks/RicSynchronizationTask.java | 61 ++++++++++--------- 8 files changed, 113 insertions(+), 67 deletions(-) (limited to 'a1-policy-management/src/main/java/org') diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java index 5bfe677e..170ade6f 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java @@ -136,8 +136,8 @@ public class ApplicationConfig { @Getter private final Type type; - RicConfigUpdate(RicConfig ric, Type event) { - this.ricConfig = ric; + public RicConfigUpdate(RicConfig config, Type event) { + this.ricConfig = config; this.type = event; } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java index 4e42550c..2edd1e51 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java @@ -141,9 +141,9 @@ public class ServiceController { } } - @Operation(summary = "Unregisters a service") + @Operation(summary = "Unregister a service") @ApiResponses(value = { // - @ApiResponse(responseCode = "204", description = "Service unregisterred", // + @ApiResponse(responseCode = "204", description = "Service unregistered", // content = @Content(schema = @Schema(implementation = VoidResponse.class))), @ApiResponse(responseCode = "404", description = "Service not found", // content = @Content(schema = @Schema(implementation = String.class)))}) diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index a24c5bd0..45aa57e4 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -25,7 +25,6 @@ import com.google.gson.GsonBuilder; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; import java.io.PrintStream; import java.lang.invoke.MethodHandles; import java.nio.file.Files; @@ -49,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.util.FileSystemUtils; +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally @Configuration public class Policies { @@ -132,7 +132,7 @@ public class Policies { if (!policy.isTransient()) { try { Files.delete(getPath(policy)); - } catch (IOException | ServiceException e) { + } catch (Exception e) { logger.debug("Could not delete policy from database: {}", e.getMessage()); } } @@ -162,7 +162,7 @@ public class Policies { if (this.appConfig.getVardataDirectory() != null) { FileSystemUtils.deleteRecursively(getDatabasePath()); } - } catch (IOException | ServiceException e) { + } catch (Exception e) { logger.warn("Could not delete policy database : {}", e.getMessage()); } } @@ -186,8 +186,7 @@ public class Policies { return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json"); } - public void restoreFromDatabase(Ric ric, PolicyTypes types) { - + public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) { try { Files.createDirectories(getDatabasePath(ric)); for (File file : getDatabasePath(ric).toFile().listFiles()) { @@ -195,7 +194,9 @@ public class Policies { PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class); this.put(toPolicy(policyStorage, ric, types)); } - } catch (ServiceException | IOException e) { + logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), + this.policiesRic.get(ric.id()).size()); + } catch (Exception e) { logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java index 76f0e216..ad3270cb 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java @@ -119,7 +119,7 @@ public class PolicyTypes { PolicyType type = gson.fromJson(json, PolicyType.class); this.types.put(type.getId(), type); } - + logger.debug("Restored type database,no of types: {}", this.types.size()); } catch (IOException e) { logger.warn("Could not restore policy type database : {}", e.getMessage()); } catch (ServiceException e) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java index 9c4b2750..f9af2393 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java @@ -78,7 +78,7 @@ public class Ric { * @return a vector containing the nodes managed by this Ric. */ public synchronized Collection getManagedElementIds() { - return ricConfig.managedElementIds(); + return new Vector<>(ricConfig.managedElementIds()); } /** diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 771dea52..c733cb0d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; @@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable; * configuration file. */ @Component +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class RefreshConfigTask { private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class); @@ -119,14 +120,14 @@ public class RefreshConfigTask { } Flux createRefreshTask() { - Flux loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux loadFromFile = regularInterval() // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux loadFromConsul = regularInterval() // .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // .flatMap(this::getFromCbs) // @@ -135,14 +136,21 @@ public class RefreshConfigTask { .doOnNext(json -> this.isConsulUsed = true) // .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); + final int CONCURRENCY = 50; // Number of RIC synched in paralell + return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // - .flatMap(this::updateConfig) // - .doOnNext(this::handleUpdatedRicConfig) // - .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // + .flatMap(this::updateConfig, CONCURRENCY) // + .flatMap(this::handleUpdatedRicConfig) // .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } + private Flux regularInterval() { + return Flux.interval(Duration.ZERO, configRefreshInterval) // + .onBackpressureDrop() // + .limitRate(1); // Limit so that only one event is emitted at a time + } + Mono getEnvironment(Properties systemEnvironment) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // .onErrorResume(t -> Mono.empty()); @@ -192,19 +200,43 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, - restClientFactory, rics); - synch.run(ric); + synchronizationTask().run(ric); } } - private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { + private RicSynchronizationTask synchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); + } + + /** + * for an added RIC after a restart it is nesessary to get the suypported policy + * types from the RIC unless a full synchronization is wanted. + * + * @param ric the ric to get supprted types from + * @return the same ric + */ + private Mono trySyncronizeSupportedTypes(Ric ric) { + logger.debug("Synchronizing policy types for new RIC: {}", ric.id()); + // Synchronize the policy types + return this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // + .collectList() // + .flatMap(list -> Mono.just(ric)) // + .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), + t.getMessage())) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Mono handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { synchronized (this.rics) { String ricId = updatedInfo.getRicConfig().ricId(); RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - addRic(updatedInfo.getRicConfig()); + Ric ric = new Ric(updatedInfo.getRicConfig()); + return trySyncronizeSupportedTypes(ric) // + .flatMap(this::addRic) // + .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -215,27 +247,25 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(updatedInfo.getRicConfig()); + addRic(new Ric(updatedInfo.getRicConfig())).block(); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } } + return Mono.just(event); } } - private void addRic(RicConfig config) { - Ric ric = new Ric(config); + Mono addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { this.policies.restoreFromDatabase(ric, this.policyTypes); } - runRicSynchronization(ric); - } + logger.debug("Added RIC: {}", ric.id()); + + ric.setState(RicState.AVAILABLE); - void runRicSynchronization(Ric ric) { - RicSynchronizationTask synchronizationTask = - new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); - synchronizationTask.run(ric); + return Mono.just(ric); } /** diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index c13df8c3..87689012 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -55,6 +55,7 @@ import reactor.core.publisher.Mono; public class RicSupervision { private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class); + private static final int CONCURRENCY = 50; // Number of RIC checked in paralell private final Rics rics; private final Policies policies; private final PolicyTypes policyTypes; @@ -107,7 +108,7 @@ public class RicSupervision { private Flux createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkOneRic); + .flatMap(this::checkOneRic, CONCURRENCY); } private Mono checkOneRic(RicData ricData) { @@ -123,10 +124,8 @@ public class RicSupervision { private void onRicCheckedError(Throwable t, RicData ricData) { logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage()); - if (t instanceof SynchStartedException) { - // this is just a temporary state, - ricData.ric.setState(RicState.AVAILABLE); - } else { + if (!(t instanceof SynchStartedException)) { + // If synch is started, the synch will set the final state ricData.ric.setState(RicState.UNAVAILABLE); } ricData.ric.getLock().unlockBlocking(); @@ -158,6 +157,7 @@ public class RicSupervision { private Mono checkRicState(RicData ric) { if (ric.ric.getState() == RicState.UNAVAILABLE) { + logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id()); return startSynchronization(ric) // .onErrorResume(t -> Mono.empty()); } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) { @@ -175,11 +175,15 @@ public class RicSupervision { private Mono validateInstances(Collection ricPolicies, RicData ric) { synchronized (this.policies) { if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) { + logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})", + ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size()); return startSynchronization(ric); } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { + logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})", + ric.ric.id(), policyId); return startSynchronization(ric); } } @@ -194,10 +198,15 @@ public class RicSupervision { private Mono validateTypes(Collection ricTypes, RicData ric) { if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) { + logger.debug( + "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})", + ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size()); return startSynchronization(ric); } for (String typeName : ricTypes) { if (!ric.ric.isSupportingType(typeName)) { + logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})", + ric.ric.id(), typeName); return startSynchronization(ric); } } @@ -206,8 +215,9 @@ public class RicSupervision { private Mono startSynchronization(RicData ric) { RicSynchronizationTask synchronizationTask = createSynchronizationTask(); - synchronizationTask.run(ric.ric); - return Mono.error(new SynchStartedException("Syncronization started")); + return synchronizationTask.synchronizeRic(ric.ric) // + .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started"))); + } RicSynchronizationTask createSynchronizationTask() { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 19222377..6ac104ca 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -78,7 +78,7 @@ public class RicSynchronizationTask { } public void run(Ric ric) { - logger.debug("Handling ric: {}", ric.getConfig().ricId()); + logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId()); if (ric.getState() == RicState.SYNCHRONIZING) { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); @@ -86,22 +86,8 @@ public class RicSynchronizationTask { } ric.getLock().lock(LockType.EXCLUSIVE) // - .flatMap(notUsed -> setRicState(ric)) // - .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // - .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) + .flatMap(notUsed -> synchronizeRic(ric)) // .subscribe(new BaseSubscriber() { - @Override - protected void hookOnError(Throwable throwable) { - logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), - throwable.getMessage()); - ric.setState(RicState.UNAVAILABLE); - } - - @Override - protected void hookOnComplete() { - onSynchronizationComplete(ric); - } @Override protected void hookFinally(SignalType type) { @@ -110,6 +96,31 @@ public class RicSynchronizationTask { }); } + public Mono synchronizeRic(Ric ric) { + return Mono.just(ric) // + .flatMap(notUsed -> setRicState(ric)) // + .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // + .flatMapMany(client -> runSynchronization(ric, client)) // + .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // + .collectList() // + .flatMap(notUsed -> Mono.just(ric)) // + .doOnError(t -> { // + logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // + ric.setState(RicState.UNAVAILABLE); // + }) // + .doOnNext(notUsed -> onSynchronizationComplete(ric)) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Flux synchronizePolicyTypes(Ric ric, A1Client a1Client) { + return a1Client.getPolicyTypeIdentities() // + .doOnNext(x -> ric.clearSupportedPolicyTypes()) // + .flatMapMany(Flux::fromIterable) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // + .doOnNext(ric::addSupportedPolicyType); // + } + @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields private Mono setRicState(Ric ric) { synchronized (ric) { @@ -117,6 +128,7 @@ public class RicSynchronizationTask { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); return Mono.empty(); } + logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId()); ric.setState(RicState.SYNCHRONIZING); return Mono.just(ric); } @@ -141,7 +153,7 @@ public class RicSynchronizationTask { } private Flux deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); + logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); deleteAllPoliciesInRepository(ric); Flux synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // @@ -158,15 +170,6 @@ public class RicSynchronizationTask { callbacks.notifyServicesRicSynchronized(ric, services); } - private Flux synchronizePolicyTypes(Ric ric, A1Client a1Client) { - return a1Client.getPolicyTypeIdentities() // - .doOnNext(x -> ric.clearSupportedPolicyTypes()) // - .flatMapMany(Flux::fromIterable) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // - .doOnNext(ric::addSupportedPolicyType); // - } - private Mono getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -188,7 +191,7 @@ public class RicSynchronizationTask { } private Flux putPolicy(Policy policy, Ric ric, A1Client a1Client) { - logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); + logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); return a1Client.putPolicy(policy) // .flatMapMany(notUsed -> Flux.just(policy)); } @@ -202,8 +205,10 @@ public class RicSynchronizationTask { private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.id())) // + .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) // .filter(policy -> !checkTransient(policy)) // - .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC) + .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage())); } } -- cgit 1.2.3-korg