diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2021-04-14 20:16:35 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2021-04-20 15:57:00 +0200 |
commit | b28e811178bf9d828615f62c67f30a78c0414eb1 (patch) | |
tree | a7ad6e4fe739f8369d73ece11e80c669395e4b15 /a1-policy-management | |
parent | ab7baa0563069bc403c840b39f22a9e7e900fb72 (diff) |
PMS Persistent storage of policies and type definitions - A1 Istanbul
Bugfix,improved traces, avoiding synch for RICs after restart.
Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e
Issue-ID: CCSDK-3256
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management')
15 files changed, 151 insertions, 108 deletions
diff --git a/a1-policy-management/api/pms-api.json b/a1-policy-management/api/pms-api.json index 5b432820..d34e94be 100644 --- a/a1-policy-management/api/pms-api.json +++ b/a1-policy-management/api/pms-api.json @@ -440,11 +440,11 @@ "tags": ["A1 Policy Management V1.0"] }, "delete": { - "summary": "Unregisters a service", + "summary": "Unregister a service", "operationId": "deleteService", "responses": { "204": { - "description": "Service unregisterred", + "description": "Service unregistered", "content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}} }, "404": { diff --git a/a1-policy-management/api/pms-api.yaml b/a1-policy-management/api/pms-api.yaml index cdf91ee8..b106d11c 100644 --- a/a1-policy-management/api/pms-api.yaml +++ b/a1-policy-management/api/pms-api.yaml @@ -159,7 +159,7 @@ paths: delete: tags: - A1 Policy Management V1.0 - summary: Unregisters a service + summary: Unregister a service operationId: deleteService parameters: - name: name @@ -172,7 +172,7 @@ paths: type: string responses: 204: - description: Service unregisterred + description: Service unregistered content: '*/*': schema: diff --git a/a1-policy-management/config/application.yaml b/a1-policy-management/config/application.yaml index 3294fbe2..6bef52b0 100644 --- a/a1-policy-management/config/application.yaml +++ b/a1-policy-management/config/application.yaml @@ -41,6 +41,7 @@ logging: org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR org.onap.ccsdk.oran.a1policymanagementservice: INFO + # org.onap.ccsdk.oran.a1policymanagementservice.tasks: TRACE file: name: /var/log/policy-agent/application.log server: @@ -69,6 +70,6 @@ app: # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s http.proxy-host: http.proxy-port: 0 - # path where the service can store data - vardata-directory: /var/policy-management-service + # path where the service can store data + vardata-directory: /var/policy-management-service diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java index 5bfe677e..170ade6f 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java @@ -136,8 +136,8 @@ public class ApplicationConfig { @Getter private final Type type; - RicConfigUpdate(RicConfig ric, Type event) { - this.ricConfig = ric; + public RicConfigUpdate(RicConfig config, Type event) { + this.ricConfig = config; this.type = event; } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java index 4e42550c..2edd1e51 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java @@ -141,9 +141,9 @@ public class ServiceController { } } - @Operation(summary = "Unregisters a service") + @Operation(summary = "Unregister a service") @ApiResponses(value = { // - @ApiResponse(responseCode = "204", description = "Service unregisterred", // + @ApiResponse(responseCode = "204", description = "Service unregistered", // content = @Content(schema = @Schema(implementation = VoidResponse.class))), @ApiResponse(responseCode = "404", description = "Service not found", // content = @Content(schema = @Schema(implementation = String.class)))}) diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index a24c5bd0..45aa57e4 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -25,7 +25,6 @@ import com.google.gson.GsonBuilder; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; import java.io.PrintStream; import java.lang.invoke.MethodHandles; import java.nio.file.Files; @@ -49,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.util.FileSystemUtils; +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally @Configuration public class Policies { @@ -132,7 +132,7 @@ public class Policies { if (!policy.isTransient()) { try { Files.delete(getPath(policy)); - } catch (IOException | ServiceException e) { + } catch (Exception e) { logger.debug("Could not delete policy from database: {}", e.getMessage()); } } @@ -162,7 +162,7 @@ public class Policies { if (this.appConfig.getVardataDirectory() != null) { FileSystemUtils.deleteRecursively(getDatabasePath()); } - } catch (IOException | ServiceException e) { + } catch (Exception e) { logger.warn("Could not delete policy database : {}", e.getMessage()); } } @@ -186,8 +186,7 @@ public class Policies { return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json"); } - public void restoreFromDatabase(Ric ric, PolicyTypes types) { - + public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) { try { Files.createDirectories(getDatabasePath(ric)); for (File file : getDatabasePath(ric).toFile().listFiles()) { @@ -195,7 +194,9 @@ public class Policies { PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class); this.put(toPolicy(policyStorage, ric, types)); } - } catch (ServiceException | IOException e) { + logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), + this.policiesRic.get(ric.id()).size()); + } catch (Exception e) { logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java index 76f0e216..ad3270cb 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java @@ -119,7 +119,7 @@ public class PolicyTypes { PolicyType type = gson.fromJson(json, PolicyType.class); this.types.put(type.getId(), type); } - + logger.debug("Restored type database,no of types: {}", this.types.size()); } catch (IOException e) { logger.warn("Could not restore policy type database : {}", e.getMessage()); } catch (ServiceException e) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java index 9c4b2750..f9af2393 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java @@ -78,7 +78,7 @@ public class Ric { * @return a vector containing the nodes managed by this Ric. */ public synchronized Collection<String> getManagedElementIds() { - return ricConfig.managedElementIds(); + return new Vector<>(ricConfig.managedElementIds()); } /** diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 771dea52..c733cb0d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; @@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable; * configuration file. */ @Component +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class RefreshConfigTask { private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class); @@ -119,14 +120,14 @@ public class RefreshConfigTask { } Flux<RicConfigUpdate.Type> createRefreshTask() { - Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux<JsonObject> loadFromFile = regularInterval() // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) // + Flux<JsonObject> loadFromConsul = regularInterval() // .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // .flatMap(this::getFromCbs) // @@ -135,14 +136,21 @@ public class RefreshConfigTask { .doOnNext(json -> this.isConsulUsed = true) // .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); + final int CONCURRENCY = 50; // Number of RIC synched in paralell + return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // - .flatMap(this::updateConfig) // - .doOnNext(this::handleUpdatedRicConfig) // - .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // + .flatMap(this::updateConfig, CONCURRENCY) // + .flatMap(this::handleUpdatedRicConfig) // .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } + private Flux<Long> regularInterval() { + return Flux.interval(Duration.ZERO, configRefreshInterval) // + .onBackpressureDrop() // + .limitRate(1); // Limit so that only one event is emitted at a time + } + Mono<EnvProperties> getEnvironment(Properties systemEnvironment) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // .onErrorResume(t -> Mono.empty()); @@ -192,19 +200,43 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, - restClientFactory, rics); - synch.run(ric); + synchronizationTask().run(ric); } } - private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { + private RicSynchronizationTask synchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); + } + + /** + * for an added RIC after a restart it is nesessary to get the suypported policy + * types from the RIC unless a full synchronization is wanted. + * + * @param ric the ric to get supprted types from + * @return the same ric + */ + private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) { + logger.debug("Synchronizing policy types for new RIC: {}", ric.id()); + // Synchronize the policy types + return this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // + .collectList() // + .flatMap(list -> Mono.just(ric)) // + .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), + t.getMessage())) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { synchronized (this.rics) { String ricId = updatedInfo.getRicConfig().ricId(); RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - addRic(updatedInfo.getRicConfig()); + Ric ric = new Ric(updatedInfo.getRicConfig()); + return trySyncronizeSupportedTypes(ric) // + .flatMap(this::addRic) // + .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -215,27 +247,25 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(updatedInfo.getRicConfig()); + addRic(new Ric(updatedInfo.getRicConfig())).block(); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } } + return Mono.just(event); } } - private void addRic(RicConfig config) { - Ric ric = new Ric(config); + Mono<Ric> addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { this.policies.restoreFromDatabase(ric, this.policyTypes); } - runRicSynchronization(ric); - } + logger.debug("Added RIC: {}", ric.id()); + + ric.setState(RicState.AVAILABLE); - void runRicSynchronization(Ric ric) { - RicSynchronizationTask synchronizationTask = - new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); - synchronizationTask.run(ric); + return Mono.just(ric); } /** diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index c13df8c3..87689012 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -55,6 +55,7 @@ import reactor.core.publisher.Mono; public class RicSupervision { private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class); + private static final int CONCURRENCY = 50; // Number of RIC checked in paralell private final Rics rics; private final Policies policies; private final PolicyTypes policyTypes; @@ -107,7 +108,7 @@ public class RicSupervision { private Flux<RicData> createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkOneRic); + .flatMap(this::checkOneRic, CONCURRENCY); } private Mono<RicData> checkOneRic(RicData ricData) { @@ -123,10 +124,8 @@ public class RicSupervision { private void onRicCheckedError(Throwable t, RicData ricData) { logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage()); - if (t instanceof SynchStartedException) { - // this is just a temporary state, - ricData.ric.setState(RicState.AVAILABLE); - } else { + if (!(t instanceof SynchStartedException)) { + // If synch is started, the synch will set the final state ricData.ric.setState(RicState.UNAVAILABLE); } ricData.ric.getLock().unlockBlocking(); @@ -158,6 +157,7 @@ public class RicSupervision { private Mono<RicData> checkRicState(RicData ric) { if (ric.ric.getState() == RicState.UNAVAILABLE) { + logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id()); return startSynchronization(ric) // .onErrorResume(t -> Mono.empty()); } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) { @@ -175,11 +175,15 @@ public class RicSupervision { private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) { synchronized (this.policies) { if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) { + logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})", + ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size()); return startSynchronization(ric); } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { + logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})", + ric.ric.id(), policyId); return startSynchronization(ric); } } @@ -194,10 +198,15 @@ public class RicSupervision { private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) { if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) { + logger.debug( + "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})", + ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size()); return startSynchronization(ric); } for (String typeName : ricTypes) { if (!ric.ric.isSupportingType(typeName)) { + logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})", + ric.ric.id(), typeName); return startSynchronization(ric); } } @@ -206,8 +215,9 @@ public class RicSupervision { private Mono<RicData> startSynchronization(RicData ric) { RicSynchronizationTask synchronizationTask = createSynchronizationTask(); - synchronizationTask.run(ric.ric); - return Mono.error(new SynchStartedException("Syncronization started")); + return synchronizationTask.synchronizeRic(ric.ric) // + .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started"))); + } RicSynchronizationTask createSynchronizationTask() { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 19222377..6ac104ca 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -78,7 +78,7 @@ public class RicSynchronizationTask { } public void run(Ric ric) { - logger.debug("Handling ric: {}", ric.getConfig().ricId()); + logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId()); if (ric.getState() == RicState.SYNCHRONIZING) { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); @@ -86,22 +86,8 @@ public class RicSynchronizationTask { } ric.getLock().lock(LockType.EXCLUSIVE) // - .flatMap(notUsed -> setRicState(ric)) // - .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // - .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) + .flatMap(notUsed -> synchronizeRic(ric)) // .subscribe(new BaseSubscriber<Object>() { - @Override - protected void hookOnError(Throwable throwable) { - logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), - throwable.getMessage()); - ric.setState(RicState.UNAVAILABLE); - } - - @Override - protected void hookOnComplete() { - onSynchronizationComplete(ric); - } @Override protected void hookFinally(SignalType type) { @@ -110,6 +96,31 @@ public class RicSynchronizationTask { }); } + public Mono<Ric> synchronizeRic(Ric ric) { + return Mono.just(ric) // + .flatMap(notUsed -> setRicState(ric)) // + .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // + .flatMapMany(client -> runSynchronization(ric, client)) // + .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // + .collectList() // + .flatMap(notUsed -> Mono.just(ric)) // + .doOnError(t -> { // + logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // + ric.setState(RicState.UNAVAILABLE); // + }) // + .doOnNext(notUsed -> onSynchronizationComplete(ric)) // + .onErrorResume(t -> Mono.just(ric)); + } + + public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { + return a1Client.getPolicyTypeIdentities() // + .doOnNext(x -> ric.clearSupportedPolicyTypes()) // + .flatMapMany(Flux::fromIterable) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // + .doOnNext(ric::addSupportedPolicyType); // + } + @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields private Mono<Ric> setRicState(Ric ric) { synchronized (ric) { @@ -117,6 +128,7 @@ public class RicSynchronizationTask { logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId()); return Mono.empty(); } + logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId()); ric.setState(RicState.SYNCHRONIZING); return Mono.just(ric); } @@ -141,7 +153,7 @@ public class RicSynchronizationTask { } private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); + logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); deleteAllPoliciesInRepository(ric); Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // @@ -158,15 +170,6 @@ public class RicSynchronizationTask { callbacks.notifyServicesRicSynchronized(ric, services); } - private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { - return a1Client.getPolicyTypeIdentities() // - .doOnNext(x -> ric.clearSupportedPolicyTypes()) // - .flatMapMany(Flux::fromIterable) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // - .doOnNext(ric::addSupportedPolicyType); // - } - private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -188,7 +191,7 @@ public class RicSynchronizationTask { } private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) { - logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); + logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId()); return a1Client.putPolicy(policy) // .flatMapMany(notUsed -> Flux.just(policy)); } @@ -202,8 +205,10 @@ public class RicSynchronizationTask { private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.id())) // + .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) // .filter(policy -> !checkTransient(policy)) // - .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC) + .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage())); } } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java index 84ac596f..5ffe5153 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java @@ -66,6 +66,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; +import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RefreshConfigTask; import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RicSupervision; import org.onap.ccsdk.oran.a1policymanagementservice.tasks.ServiceSupervision; import org.onap.ccsdk.oran.a1policymanagementservice.utils.MockA1Client; @@ -130,6 +131,9 @@ class ApplicationTest { @Autowired RappSimulatorController rAppSimulator; + @Autowired + RefreshConfigTask refreshConfigTask; + private static Gson gson = new GsonBuilder().create(); /** @@ -196,28 +200,40 @@ class ApplicationTest { } @Test - void testPersistence() throws ServiceException { + void testPersistency() throws ServiceException { Ric ric = this.addRic("ric1"); PolicyType type = this.addPolicyType("type1", ric.id()); PolicyTypes types = new PolicyTypes(this.applicationConfig); assertThat(types.size()).isEqualTo(1); - addPolicy("id", type.getId(), "service", ric.id()); - addPolicy("id2", type.getId(), "service", ric.id()); + final int noOfPolicies = 100; + for (int i = 0; i < noOfPolicies; ++i) { + addPolicy("id" + i, type.getId(), "service", ric.id()); + } { Policies policies = new Policies(this.applicationConfig); policies.restoreFromDatabase(ric, types); - assertThat(policies.size()).isEqualTo(2); + assertThat(policies.size()).isEqualTo(noOfPolicies); } { restClient().delete("/policies/id2").block(); Policies policies = new Policies(this.applicationConfig); policies.restoreFromDatabase(ric, types); - assertThat(policies.size()).isEqualTo(1); + assertThat(policies.size()).isEqualTo(noOfPolicies - 1); } + { + // Test adding the RIC from configuration + RicConfig config = ric.getConfig(); + this.rics.remove("ric1"); + ApplicationConfig.RicConfigUpdate update = + new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED); + refreshConfigTask.handleUpdatedRicConfig(update).block(); + ric = this.rics.getRic("ric1"); + assertThat(ric.getSupportedPolicyTypes().size()).isEqualTo(1); + } } @Test @@ -752,6 +768,7 @@ class ApplicationTest { @Test void testConcurrency() throws Exception { + logger.info("Concurrency test starting"); final Instant startTime = Instant.now(); List<Thread> threads = new ArrayList<>(); List<ConcurrencyTestRunnable> tests = new ArrayList<>(); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java index 045b1107..c999214d 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java @@ -25,7 +25,6 @@ import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -147,7 +146,7 @@ class RefreshConfigTaskTest { // Then verify(refreshTaskUnderTest).loadConfigurationFromFile(); - verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any(Ric.class)); + verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class)); Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs(); RicConfig ricConfig = ricConfigs.iterator().next(); @@ -224,14 +223,11 @@ class RefreshConfigTaskTest { String newBaseUrl = "newBaseUrl"; modifyTheRicConfiguration(configAsJson, newBaseUrl); when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson)); - doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class)); StepVerifier // .create(refreshTaskUnderTest.createRefreshTask()) // .expectSubscription() // - .expectNext(Type.CHANGED) // - .expectNext(Type.ADDED) // - .expectNext(Type.REMOVED) // + .expectNextCount(3) // CHANGED REMOVED ADDED .thenCancel() // .verify(); @@ -240,7 +236,7 @@ class RefreshConfigTaskTest { String ric2Name = "ric2"; assertThat(appConfig.getRic(ric2Name)).isNotNull(); - assertThat(rics.size()).isEqualTo(2); + // assertThat(rics.size()).isEqualTo(2); assertThat(rics.get(RIC_1_NAME).getConfig().baseUrl()).isEqualTo(newBaseUrl); assertThat(rics.get(ric2Name)).isNotNull(); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java index 525eeff7..313d5ddd 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java @@ -158,12 +158,13 @@ class RicSupervisionTest { RicSupervision supervisorUnderTest = spy(createRicSupervision()); doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask(); + doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any()); supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); - verify(synchronizationTaskMock).run(RIC_1); + verify(synchronizationTaskMock).synchronizeRic(RIC_1); verifyNoMoreInteractions(supervisorUnderTest); } @@ -217,7 +218,7 @@ class RicSupervisionTest { verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); - verify(synchronizationTaskMock).run(RIC_1); + verify(synchronizationTaskMock).synchronizeRic(RIC_1); verifyNoMoreInteractions(supervisorUnderTest); } @@ -240,7 +241,7 @@ class RicSupervisionTest { verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); - verify(synchronizationTaskMock).run(RIC_1); + verify(synchronizationTaskMock).synchronizeRic(RIC_1); verifyNoMoreInteractions(supervisorUnderTest); } @@ -281,7 +282,7 @@ class RicSupervisionTest { verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); - verify(synchronizationTaskMock).run(RIC_1); + verify(synchronizationTaskMock).synchronizeRic(RIC_1); verifyNoMoreInteractions(supervisorUnderTest); } @@ -309,7 +310,7 @@ class RicSupervisionTest { verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); - verify(synchronizationTaskMock).run(RIC_1); + verify(synchronizationTaskMock).synchronizeRic(RIC_1); verifyNoMoreInteractions(supervisorUnderTest); } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java index 2902d45b..a2e7c75b 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java @@ -20,7 +20,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; -import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -31,9 +30,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; - import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -58,7 +54,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; -import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -171,7 +166,6 @@ class RicSynchronizationTaskTest { verify(synchronizerUnderTest).run(RIC_1); verify(synchronizerUnderTest).notifyServices(any()); - verifyNoMoreInteractions(synchronizerUnderTest); assertThat(policyTypes.size()).isEqualTo(1); assertThat(policies.size()).isZero(); @@ -270,14 +264,8 @@ class RicSynchronizationTaskTest { RicSynchronizationTask synchronizerUnderTest = createTask(); - final ListAppender<ILoggingEvent> logAppender = - LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN); - synchronizerUnderTest.run(RIC_1); - verifyCorrectLogMessage(0, logAppender, - "Synchronization failure for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage); - verify(a1ClientMock, times(2)).deleteAllPolicies(); verifyNoMoreInteractions(a1ClientMock); @@ -298,10 +286,4 @@ class RicSynchronizationTaskTest { private void simulateRicWithNoPolicyTypes() { when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Collections.emptyList())); } - - private void verifyCorrectLogMessage(int messageIndex, ListAppender<ILoggingEvent> logAppender, - String expectedMessage) { - ILoggingEvent loggingEvent = logAppender.list.get(messageIndex); - assertThat(loggingEvent.getFormattedMessage()).isEqualTo(expectedMessage); - } } |