aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-12-23 15:44:27 +0100
committerPatrikBuhr <patrik.buhr@est.tech>2022-12-28 10:47:11 +0100
commit123ba96ee12d73d89aac9e274ae9e3cffe7249ea (patch)
treeb00fcb23885d38cdcf08b78fabaf005e953ae730
parentc7f757e98066775ed2fdeb67f3d31777e8430624 (diff)
ONAP PMS - new RICs must be locked before synch
Bugfix, new RICs must also be locked before synch. Otherwise other activities may interfere. Improved the synch. Previously, all policies were removed from the NearRT-RIC and eventually recreated. After this fix, only unknwon policies are removed. Change-Id: Ic6224aeb93ef91579cfb8894329538baf1829283 Issue-ID: CCSDK-3827 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java8
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java11
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java8
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java9
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java9
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java6
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java30
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java24
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java4
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java62
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java3
14 files changed, 88 insertions, 94 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
index a691ee1e..21636a38 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
@@ -20,7 +20,9 @@
package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
@@ -61,7 +63,11 @@ public interface A1Client {
public Mono<String> deletePolicy(Policy policy);
- public Flux<String> deleteAllPolicies();
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds);
+
+ default Flux<String> deleteAllPolicies() {
+ return deleteAllPolicies(Collections.emptySet());
+ }
public Mono<String> getPolicyStatus(Policy policy);
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
index 75ba2515..45c0bc42 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
@@ -30,7 +30,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
/**
@@ -45,7 +44,6 @@ public class A1ClientFactory {
private final AsyncRestClientFactory restClientFactory;
- @Autowired
public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) {
this.appConfig = appConfig;
this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext);
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
index df6faade..9c32d79c 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import lombok.Getter;
@@ -190,15 +191,16 @@ public class CcsdkA1AdapterClient implements A1Client {
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) {
return getPolicyIds() //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
} else {
A1UriBuilder uriBuilder = this.getUriBuilder();
return getPolicyTypeIdentities() //
.flatMapMany(Flux::fromIterable) //
- .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
+ .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC);
}
}
@@ -207,9 +209,10 @@ public class CcsdkA1AdapterClient implements A1Client {
.flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString);
}
- private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) {
+ private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set<String> excludePolicyIds) {
return getInstancesForType(uriBuilder, type) //
- .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
+ .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC);
}
@Override
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
index dfe33e7d..1d567a89 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.Set;
import org.json.JSONObject;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
@@ -182,9 +183,9 @@ public class OscA1Client implements A1Client {
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+ .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
}
@Override
@@ -209,8 +210,9 @@ public class OscA1Client implements A1Client {
return restClient.delete(policyUri);
}
- private Flux<String> deletePoliciesForType(String typeId) {
+ private Flux<String> deletePoliciesForType(String typeId, Set<String> excludePolicyIds) {
return getPolicyIdentitiesByType(typeId) //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
index 5eae7759..750008d8 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
@@ -134,8 +135,9 @@ public class StdA1ClientVersion1 implements A1Client {
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyIds() //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(this::deletePolicyById); //
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java
index 0022057b..24990a1a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.Set;
import org.json.JSONObject;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
@@ -193,16 +194,15 @@ public class StdA1ClientVersion2 implements A1Client {
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+ .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
}
@Override
public Mono<String> getPolicyStatus(Policy policy) {
String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId());
return restClient.get(statusUri);
-
}
private Flux<String> getPolicyTypeIds() {
@@ -220,8 +220,9 @@ public class StdA1ClientVersion2 implements A1Client {
return restClient.delete(policyUri);
}
- private Flux<String> deletePoliciesForType(String typeId) {
+ private Flux<String> deleteAllPoliciesForType(String typeId, Set<String> excludePolicyIds) {
return getPolicyIdentitiesByType(typeId) //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java
index ff09ba3a..6f152870 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
/**
@@ -58,6 +59,14 @@ public class MultiMap<T> {
return new Vector<>(innerMap.values());
}
+ public Set<String> keySet(String key) {
+ Map<String, T> innerMap = this.map.get(key);
+ if (innerMap == null) {
+ return Collections.emptySet();
+ }
+ return innerMap.keySet();
+ }
+
public void clear() {
this.map.clear();
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
index 6161fbd9..77ac0f4a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
@@ -71,11 +71,9 @@ public class Policies {
private MultiMap<Policy> policiesType = new MultiMap<>();
private final DataStore dataStore;
- private final ApplicationConfig appConfig;
private static Gson gson = new GsonBuilder().create();
public Policies(@Autowired ApplicationConfig appConfig) {
- this.appConfig = appConfig;
this.dataStore = DataStore.create(appConfig, "policies");
}
@@ -139,6 +137,10 @@ public class Policies {
return policiesRic.get(ric);
}
+ public synchronized Set<String> getPolicyIdsForRic(String ricId) {
+ return policiesRic.keySet(ricId);
+ }
+
public synchronized Collection<Policy> getForType(String type) {
return policiesType.get(type);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 408a0d5f..978ae1c0 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
@@ -43,7 +44,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -76,7 +76,7 @@ public class RefreshConfigTask {
@Getter(AccessLevel.PROTECTED)
private Disposable refreshTask = null;
- private final Rics rics;
+ final Rics rics;
private final A1ClientFactory a1ClientFactory;
private final Policies policies;
private final Services services;
@@ -85,7 +85,6 @@ public class RefreshConfigTask {
private long fileLastModified = 0;
- @Autowired
public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics,
Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory,
SecurityContext securityContext) {
@@ -161,7 +160,10 @@ public class RefreshConfigTask {
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- synchronizationTask().run(ric);
+ ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") //
+ .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) //
+ .doFinally(sig -> ric.getLock().unlockBlocking()) //
+ .subscribe();
}
}
@@ -176,9 +178,15 @@ public class RefreshConfigTask {
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
Ric ric = new Ric(updatedInfo.getRicConfig());
- this.addRic(ric);
- return this.synchronizationTask().synchronizeRic(ric) //
- .map(notUsed -> event);
+
+ return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") //
+ .doOnNext(grant -> this.rics.put(ric)) //
+ .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) //
+ .collectList() //
+ .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) //
+ .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) //
+ .map(notUsed -> event) //
+ .doFinally(sig -> ric.getLock().unlockBlocking());
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -189,7 +197,7 @@ public class RefreshConfigTask {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(new Ric(updatedInfo.getRicConfig()));
+ this.rics.put(new Ric(updatedInfo.getRicConfig()));
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
@@ -198,12 +206,6 @@ public class RefreshConfigTask {
}
}
- void addRic(Ric ric) {
- this.rics.put(ric);
- this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe();
- logger.debug("Added RIC: {}", ric.id());
- }
-
/**
* Reads the configuration from file.
*/
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index fdeb47e2..1f612e42 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -37,7 +37,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -86,7 +85,6 @@ public class RicSupervision {
private final A1Client a1Client;
}
- @Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services, ApplicationConfig config, SecurityContext securityContext) {
this.rics = rics;
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index b3afa7cd..d1bd433b 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -22,11 +22,12 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
+import java.util.Set;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -77,21 +78,11 @@ public class RicSynchronizationTask {
this.rics = rics;
}
- public void run(Ric ric) {
- logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId());
-
- if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId());
- return;
- }
-
- ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
- .flatMap(notUsed -> synchronizeRic(ric)) //
- .doFinally(sig -> ric.getLock().unlockBlocking()) //
- .subscribe();
- }
-
public Mono<Ric> synchronizeRic(Ric ric) {
+ if (ric.getLock().getLockCounter() != 1) {
+ logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id());
+ return Mono.empty();
+ }
return this.a1ClientFactory.createA1Client(ric) //
.doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
@@ -143,7 +134,8 @@ public class RicSynchronizationTask {
private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
- Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+ Set<String> excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id());
+ Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete);
Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
index 2ecb9c28..45f2bcbc 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
@@ -81,7 +81,7 @@ class RefreshConfigTaskTest {
.build();
private RefreshConfigTask createTestObject(boolean configFileExists) {
- return createTestObject(configFileExists, new Rics(), new Policies(appConfig), true);
+ return createTestObject(configFileExists, spy(new Rics()), new Policies(appConfig), true);
}
private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
@@ -116,7 +116,7 @@ class RefreshConfigTaskTest {
// Then
verify(refreshTaskUnderTest, atLeastOnce()).loadConfigurationFromFile();
- verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
+ verify(refreshTaskUnderTest.rics, times(2)).put(any(Ric.class));
Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
RicConfig ricConfig = ricConfigs.iterator().next();
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
index 6386441c..ae4e92bc 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
@@ -23,11 +23,10 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -48,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -131,35 +131,14 @@ class RicSynchronizationTaskTest {
};
@Test
- void ricAlreadySynchronizing_thenNoSynchronization() {
- ric1.setState(RicState.SYNCHRONIZING);
- ric1.addSupportedPolicyType(POLICY_TYPE_1);
-
- policyTypes.put(POLICY_TYPE_1);
- policies.put(policy1);
-
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
- synchronizerUnderTest.run(ric1);
-
- verifyNoInteractions(a1ClientMock);
-
- assertThat(policyTypes.size()).isEqualTo(1);
- assertThat(policies.size()).isEqualTo(1);
- assertThat(ric1.getState()).isEqualTo(RicState.SYNCHRONIZING);
- assertThat(ric1.getSupportedPolicyTypeNames()).hasSize(1);
- }
-
- @Test
void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
setUpCreationOfA1Client();
simulateRicWithNoPolicyTypes();
policies.put(policy1);
WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null);
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
- RicSynchronizationTask synchronizerUnderTest = createTask();
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
ric1.setState(RicState.AVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
assertThat(policies.size()).isZero();
assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
@@ -172,10 +151,9 @@ class RicSynchronizationTaskTest {
policies.put(policy1);
WebClientRequestException exception =
new WebClientRequestException(new ServiceException("x"), null, null, null);
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
- RicSynchronizationTask synchronizerUnderTest = createTask();
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
ric1.setState(RicState.AVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
}
@@ -193,17 +171,13 @@ class RicSynchronizationTaskTest {
setUpCreationOfA1Client();
simulateRicWithOnePolicyType();
- RicSynchronizationTask synchronizerUnderTest = spy(createTask());
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
verify(a1ClientMock, times(1)).getPolicyTypeIdentities();
verifyNoMoreInteractions(a1ClientMock);
- verify(synchronizerUnderTest).run(ric1);
-
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policies.size()).isZero();
assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
@@ -219,10 +193,8 @@ class RicSynchronizationTaskTest {
String typeSchema = "schema";
when(a1ClientMock.getPolicyTypeSchema(POLICY_TYPE_1_NAME)).thenReturn(Mono.just(typeSchema));
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
verify(a1ClientMock).getPolicyTypeIdentities();
@@ -247,16 +219,14 @@ class RicSynchronizationTaskTest {
setUpCreationOfA1Client();
simulateRicWithNoPolicyTypes();
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.just("OK"));
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.just("OK"));
when(a1ClientMock.putPolicy(any(Policy.class))).thenReturn(Mono.just("OK"));
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
- verify(a1ClientMock).deleteAllPolicies();
+ verify(a1ClientMock).deleteAllPolicies(anySet());
verify(a1ClientMock).putPolicy(policy1);
verifyNoMoreInteractions(a1ClientMock);
@@ -265,9 +235,17 @@ class RicSynchronizationTaskTest {
assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
}
+ private void runSynch(Ric ric) {
+ RicSynchronizationTask synchronizerUnderTest = createTask();
+ ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
+ .flatMap(notUsed -> synchronizerUnderTest.synchronizeRic(ric)) //
+ .doFinally(sig -> ric.getLock().unlockBlocking()) //
+ .block();
+ }
+
private void setUpCreationOfA1Client() {
when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
- doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
+ doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(anySet());
}
private void simulateRicWithOnePolicyType() {
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
index b76f1e72..80bfff72 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
+import java.util.Set;
import java.util.Vector;
import lombok.Setter;
@@ -112,7 +113,7 @@ public class MockA1Client implements A1Client {
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyId) {
this.policies.clear();
return mono("OK") //
.flatMapMany(Flux::just);