summaryrefslogtreecommitdiffstats
path: root/a1-policy-management
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2021-04-14 20:16:35 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2021-04-20 15:57:00 +0200
commitb28e811178bf9d828615f62c67f30a78c0414eb1 (patch)
treea7ad6e4fe739f8369d73ece11e80c669395e4b15 /a1-policy-management
parentab7baa0563069bc403c840b39f22a9e7e900fb72 (diff)
PMS Persistent storage of policies and type definitions - A1 Istanbul
Bugfix,improved traces, avoiding synch for RICs after restart. Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e Issue-ID: CCSDK-3256 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management')
-rw-r--r--a1-policy-management/api/pms-api.json4
-rw-r--r--a1-policy-management/api/pms-api.yaml4
-rw-r--r--a1-policy-management/config/application.yaml5
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java13
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java70
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java24
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java61
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java27
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java10
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java11
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java18
15 files changed, 151 insertions, 108 deletions
diff --git a/a1-policy-management/api/pms-api.json b/a1-policy-management/api/pms-api.json
index 5b432820..d34e94be 100644
--- a/a1-policy-management/api/pms-api.json
+++ b/a1-policy-management/api/pms-api.json
@@ -440,11 +440,11 @@
"tags": ["A1 Policy Management V1.0"]
},
"delete": {
- "summary": "Unregisters a service",
+ "summary": "Unregister a service",
"operationId": "deleteService",
"responses": {
"204": {
- "description": "Service unregisterred",
+ "description": "Service unregistered",
"content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}}
},
"404": {
diff --git a/a1-policy-management/api/pms-api.yaml b/a1-policy-management/api/pms-api.yaml
index cdf91ee8..b106d11c 100644
--- a/a1-policy-management/api/pms-api.yaml
+++ b/a1-policy-management/api/pms-api.yaml
@@ -159,7 +159,7 @@ paths:
delete:
tags:
- A1 Policy Management V1.0
- summary: Unregisters a service
+ summary: Unregister a service
operationId: deleteService
parameters:
- name: name
@@ -172,7 +172,7 @@ paths:
type: string
responses:
204:
- description: Service unregisterred
+ description: Service unregistered
content:
'*/*':
schema:
diff --git a/a1-policy-management/config/application.yaml b/a1-policy-management/config/application.yaml
index 3294fbe2..6bef52b0 100644
--- a/a1-policy-management/config/application.yaml
+++ b/a1-policy-management/config/application.yaml
@@ -41,6 +41,7 @@ logging:
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.ccsdk.oran.a1policymanagementservice: INFO
+ # org.onap.ccsdk.oran.a1policymanagementservice.tasks: TRACE
file:
name: /var/log/policy-agent/application.log
server:
@@ -69,6 +70,6 @@ app:
# The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host:
http.proxy-port: 0
- # path where the service can store data
- vardata-directory: /var/policy-management-service
+ # path where the service can store data
+ vardata-directory: /var/policy-management-service
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
index 5bfe677e..170ade6f 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
@@ -136,8 +136,8 @@ public class ApplicationConfig {
@Getter
private final Type type;
- RicConfigUpdate(RicConfig ric, Type event) {
- this.ricConfig = ric;
+ public RicConfigUpdate(RicConfig config, Type event) {
+ this.ricConfig = config;
this.type = event;
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
index 4e42550c..2edd1e51 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/ServiceController.java
@@ -141,9 +141,9 @@ public class ServiceController {
}
}
- @Operation(summary = "Unregisters a service")
+ @Operation(summary = "Unregister a service")
@ApiResponses(value = { //
- @ApiResponse(responseCode = "204", description = "Service unregisterred", //
+ @ApiResponse(responseCode = "204", description = "Service unregistered", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))),
@ApiResponse(responseCode = "404", description = "Service not found", //
content = @Content(schema = @Schema(implementation = String.class)))})
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
index a24c5bd0..45aa57e4 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
@@ -25,7 +25,6 @@ import com.google.gson.GsonBuilder;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
@@ -49,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.FileSystemUtils;
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@Configuration
public class Policies {
@@ -132,7 +132,7 @@ public class Policies {
if (!policy.isTransient()) {
try {
Files.delete(getPath(policy));
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.debug("Could not delete policy from database: {}", e.getMessage());
}
}
@@ -162,7 +162,7 @@ public class Policies {
if (this.appConfig.getVardataDirectory() != null) {
FileSystemUtils.deleteRecursively(getDatabasePath());
}
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.warn("Could not delete policy database : {}", e.getMessage());
}
}
@@ -186,8 +186,7 @@ public class Policies {
return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json");
}
- public void restoreFromDatabase(Ric ric, PolicyTypes types) {
-
+ public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) {
try {
Files.createDirectories(getDatabasePath(ric));
for (File file : getDatabasePath(ric).toFile().listFiles()) {
@@ -195,7 +194,9 @@ public class Policies {
PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class);
this.put(toPolicy(policyStorage, ric, types));
}
- } catch (ServiceException | IOException e) {
+ logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
+ this.policiesRic.get(ric.id()).size());
+ } catch (Exception e) {
logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
index 76f0e216..ad3270cb 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
@@ -119,7 +119,7 @@ public class PolicyTypes {
PolicyType type = gson.fromJson(json, PolicyType.class);
this.types.put(type.getId(), type);
}
-
+ logger.debug("Restored type database,no of types: {}", this.types.size());
} catch (IOException e) {
logger.warn("Could not restore policy type database : {}", e.getMessage());
} catch (ServiceException e) {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
index 9c4b2750..f9af2393 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
@@ -78,7 +78,7 @@ public class Ric {
* @return a vector containing the nodes managed by this Ric.
*/
public synchronized Collection<String> getManagedElementIds() {
- return ricConfig.managedElementIds();
+ return new Vector<>(ricConfig.managedElementIds());
}
/**
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 771dea52..c733cb0d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -35,10 +35,10 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
@@ -63,6 +63,7 @@ import reactor.util.annotation.Nullable;
* configuration file.
*/
@Component
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class RefreshConfigTask {
private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
@@ -119,14 +120,14 @@ public class RefreshConfigTask {
}
Flux<RicConfigUpdate.Type> createRefreshTask() {
- Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromFile = regularInterval() //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreErrorFlux) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromConsul = regularInterval() //
.flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
.flatMap(this::getFromCbs) //
@@ -135,14 +136,21 @@ public class RefreshConfigTask {
.doOnNext(json -> this.isConsulUsed = true) //
.doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+ final int CONCURRENCY = 50; // Number of RIC synched in paralell
+
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
- .flatMap(this::updateConfig) //
- .doOnNext(this::handleUpdatedRicConfig) //
- .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+ .flatMap(this::updateConfig, CONCURRENCY) //
+ .flatMap(this::handleUpdatedRicConfig) //
.doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
+ private Flux<Long> regularInterval() {
+ return Flux.interval(Duration.ZERO, configRefreshInterval) //
+ .onBackpressureDrop() //
+ .limitRate(1); // Limit so that only one event is emitted at a time
+ }
+
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
.onErrorResume(t -> Mono.empty());
@@ -192,19 +200,43 @@ public class RefreshConfigTask {
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services,
- restClientFactory, rics);
- synch.run(ric);
+ synchronizationTask().run(ric);
}
}
- private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ private RicSynchronizationTask synchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
+ }
+
+ /**
+ * for an added RIC after a restart it is nesessary to get the suypported policy
+ * types from the RIC unless a full synchronization is wanted.
+ *
+ * @param ric the ric to get supprted types from
+ * @return the same ric
+ */
+ private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
+ logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
+ // Synchronize the policy types
+ return this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric)) //
+ .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(),
+ t.getMessage())) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().ricId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
- addRic(updatedInfo.getRicConfig());
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ return trySyncronizeSupportedTypes(ric) //
+ .flatMap(this::addRic) //
+ .flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -215,27 +247,25 @@ public class RefreshConfigTask {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(updatedInfo.getRicConfig());
+ addRic(new Ric(updatedInfo.getRicConfig())).block();
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
+ return Mono.just(event);
}
}
- private void addRic(RicConfig config) {
- Ric ric = new Ric(config);
+ Mono<Ric> addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
- runRicSynchronization(ric);
- }
+ logger.debug("Added RIC: {}", ric.id());
+
+ ric.setState(RicState.AVAILABLE);
- void runRicSynchronization(Ric ric) {
- RicSynchronizationTask synchronizationTask =
- new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
- synchronizationTask.run(ric);
+ return Mono.just(ric);
}
/**
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index c13df8c3..87689012 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -55,6 +55,7 @@ import reactor.core.publisher.Mono;
public class RicSupervision {
private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
+ private static final int CONCURRENCY = 50; // Number of RIC checked in paralell
private final Rics rics;
private final Policies policies;
private final PolicyTypes policyTypes;
@@ -107,7 +108,7 @@ public class RicSupervision {
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkOneRic);
+ .flatMap(this::checkOneRic, CONCURRENCY);
}
private Mono<RicData> checkOneRic(RicData ricData) {
@@ -123,10 +124,8 @@ public class RicSupervision {
private void onRicCheckedError(Throwable t, RicData ricData) {
logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
- if (t instanceof SynchStartedException) {
- // this is just a temporary state,
- ricData.ric.setState(RicState.AVAILABLE);
- } else {
+ if (!(t instanceof SynchStartedException)) {
+ // If synch is started, the synch will set the final state
ricData.ric.setState(RicState.UNAVAILABLE);
}
ricData.ric.getLock().unlockBlocking();
@@ -158,6 +157,7 @@ public class RicSupervision {
private Mono<RicData> checkRicState(RicData ric) {
if (ric.ric.getState() == RicState.UNAVAILABLE) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id());
return startSynchronization(ric) //
.onErrorResume(t -> Mono.empty());
} else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
@@ -175,11 +175,15 @@ public class RicSupervision {
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})",
+ ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size());
return startSynchronization(ric);
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})",
+ ric.ric.id(), policyId);
return startSynchronization(ric);
}
}
@@ -194,10 +198,15 @@ public class RicSupervision {
private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
+ logger.debug(
+ "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})",
+ ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size());
return startSynchronization(ric);
}
for (String typeName : ricTypes) {
if (!ric.ric.isSupportingType(typeName)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})",
+ ric.ric.id(), typeName);
return startSynchronization(ric);
}
}
@@ -206,8 +215,9 @@ public class RicSupervision {
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask synchronizationTask = createSynchronizationTask();
- synchronizationTask.run(ric.ric);
- return Mono.error(new SynchStartedException("Syncronization started"));
+ return synchronizationTask.synchronizeRic(ric.ric) //
+ .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started")));
+
}
RicSynchronizationTask createSynchronizationTask() {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 19222377..6ac104ca 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -78,7 +78,7 @@ public class RicSynchronizationTask {
}
public void run(Ric ric) {
- logger.debug("Handling ric: {}", ric.getConfig().ricId());
+ logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
if (ric.getState() == RicState.SYNCHRONIZING) {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
@@ -86,22 +86,8 @@ public class RicSynchronizationTask {
}
ric.getLock().lock(LockType.EXCLUSIVE) //
- .flatMap(notUsed -> setRicState(ric)) //
- .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .flatMap(notUsed -> synchronizeRic(ric)) //
.subscribe(new BaseSubscriber<Object>() {
- @Override
- protected void hookOnError(Throwable throwable) {
- logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(),
- throwable.getMessage());
- ric.setState(RicState.UNAVAILABLE);
- }
-
- @Override
- protected void hookOnComplete() {
- onSynchronizationComplete(ric);
- }
@Override
protected void hookFinally(SignalType type) {
@@ -110,6 +96,31 @@ public class RicSynchronizationTask {
});
}
+ public Mono<Ric> synchronizeRic(Ric ric) {
+ return Mono.just(ric) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
+ .collectList() //
+ .flatMap(notUsed -> Mono.just(ric)) //
+ .doOnError(t -> { //
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
+ ric.setState(RicState.UNAVAILABLE); //
+ }) //
+ .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+ return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+ .doOnNext(ric::addSupportedPolicyType); //
+ }
+
@SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
private Mono<Ric> setRicState(Ric ric) {
synchronized (ric) {
@@ -117,6 +128,7 @@ public class RicSynchronizationTask {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
return Mono.empty();
}
+ logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
ric.setState(RicState.SYNCHRONIZING);
return Mono.just(ric);
}
@@ -141,7 +153,7 @@ public class RicSynchronizationTask {
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -158,15 +170,6 @@ public class RicSynchronizationTask {
callbacks.notifyServicesRicSynchronized(ric, services);
}
- private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyTypeIdentities() //
- .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
- .doOnNext(ric::addSupportedPolicyType); //
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
@@ -188,7 +191,7 @@ public class RicSynchronizationTask {
}
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
- logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+ logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
return a1Client.putPolicy(policy) //
.flatMapMany(notUsed -> Flux.just(policy));
}
@@ -202,8 +205,10 @@ public class RicSynchronizationTask {
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.id())) //
+ .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) //
.filter(policy -> !checkTransient(policy)) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC)
+ .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage()));
}
}
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
index 84ac596f..5ffe5153 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
@@ -66,6 +66,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RefreshConfigTask;
import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RicSupervision;
import org.onap.ccsdk.oran.a1policymanagementservice.tasks.ServiceSupervision;
import org.onap.ccsdk.oran.a1policymanagementservice.utils.MockA1Client;
@@ -130,6 +131,9 @@ class ApplicationTest {
@Autowired
RappSimulatorController rAppSimulator;
+ @Autowired
+ RefreshConfigTask refreshConfigTask;
+
private static Gson gson = new GsonBuilder().create();
/**
@@ -196,28 +200,40 @@ class ApplicationTest {
}
@Test
- void testPersistence() throws ServiceException {
+ void testPersistency() throws ServiceException {
Ric ric = this.addRic("ric1");
PolicyType type = this.addPolicyType("type1", ric.id());
PolicyTypes types = new PolicyTypes(this.applicationConfig);
assertThat(types.size()).isEqualTo(1);
- addPolicy("id", type.getId(), "service", ric.id());
- addPolicy("id2", type.getId(), "service", ric.id());
+ final int noOfPolicies = 100;
+ for (int i = 0; i < noOfPolicies; ++i) {
+ addPolicy("id" + i, type.getId(), "service", ric.id());
+ }
{
Policies policies = new Policies(this.applicationConfig);
policies.restoreFromDatabase(ric, types);
- assertThat(policies.size()).isEqualTo(2);
+ assertThat(policies.size()).isEqualTo(noOfPolicies);
}
{
restClient().delete("/policies/id2").block();
Policies policies = new Policies(this.applicationConfig);
policies.restoreFromDatabase(ric, types);
- assertThat(policies.size()).isEqualTo(1);
+ assertThat(policies.size()).isEqualTo(noOfPolicies - 1);
}
+ {
+ // Test adding the RIC from configuration
+ RicConfig config = ric.getConfig();
+ this.rics.remove("ric1");
+ ApplicationConfig.RicConfigUpdate update =
+ new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED);
+ refreshConfigTask.handleUpdatedRicConfig(update).block();
+ ric = this.rics.getRic("ric1");
+ assertThat(ric.getSupportedPolicyTypes().size()).isEqualTo(1);
+ }
}
@Test
@@ -752,6 +768,7 @@ class ApplicationTest {
@Test
void testConcurrency() throws Exception {
+ logger.info("Concurrency test starting");
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
List<ConcurrencyTestRunnable> tests = new ArrayList<>();
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
index 045b1107..c999214d 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
@@ -25,7 +25,6 @@ import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -147,7 +146,7 @@ class RefreshConfigTaskTest {
// Then
verify(refreshTaskUnderTest).loadConfigurationFromFile();
- verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any(Ric.class));
+ verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
RicConfig ricConfig = ricConfigs.iterator().next();
@@ -224,14 +223,11 @@ class RefreshConfigTaskTest {
String newBaseUrl = "newBaseUrl";
modifyTheRicConfiguration(configAsJson, newBaseUrl);
when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson));
- doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class));
StepVerifier //
.create(refreshTaskUnderTest.createRefreshTask()) //
.expectSubscription() //
- .expectNext(Type.CHANGED) //
- .expectNext(Type.ADDED) //
- .expectNext(Type.REMOVED) //
+ .expectNextCount(3) // CHANGED REMOVED ADDED
.thenCancel() //
.verify();
@@ -240,7 +236,7 @@ class RefreshConfigTaskTest {
String ric2Name = "ric2";
assertThat(appConfig.getRic(ric2Name)).isNotNull();
- assertThat(rics.size()).isEqualTo(2);
+ // assertThat(rics.size()).isEqualTo(2);
assertThat(rics.get(RIC_1_NAME).getConfig().baseUrl()).isEqualTo(newBaseUrl);
assertThat(rics.get(ric2Name)).isNotNull();
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
index 525eeff7..313d5ddd 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
@@ -158,12 +158,13 @@ class RicSupervisionTest {
RicSupervision supervisorUnderTest = spy(createRicSupervision());
doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+ doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
@@ -217,7 +218,7 @@ class RicSupervisionTest {
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
@@ -240,7 +241,7 @@ class RicSupervisionTest {
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
@@ -281,7 +282,7 @@ class RicSupervisionTest {
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
@@ -309,7 +310,7 @@ class RicSupervisionTest {
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
index 2902d45b..a2e7c75b 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
@@ -20,7 +20,6 @@
package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
-import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
@@ -31,9 +30,6 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
@@ -58,7 +54,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
-import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -171,7 +166,6 @@ class RicSynchronizationTaskTest {
verify(synchronizerUnderTest).run(RIC_1);
verify(synchronizerUnderTest).notifyServices(any());
- verifyNoMoreInteractions(synchronizerUnderTest);
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policies.size()).isZero();
@@ -270,14 +264,8 @@ class RicSynchronizationTaskTest {
RicSynchronizationTask synchronizerUnderTest = createTask();
- final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN);
-
synchronizerUnderTest.run(RIC_1);
- verifyCorrectLogMessage(0, logAppender,
- "Synchronization failure for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
-
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
@@ -298,10 +286,4 @@ class RicSynchronizationTaskTest {
private void simulateRicWithNoPolicyTypes() {
when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Collections.emptyList()));
}
-
- private void verifyCorrectLogMessage(int messageIndex, ListAppender<ILoggingEvent> logAppender,
- String expectedMessage) {
- ILoggingEvent loggingEvent = logAppender.list.get(messageIndex);
- assertThat(loggingEvent.getFormattedMessage()).isEqualTo(expectedMessage);
- }
}