From 123ba96ee12d73d89aac9e274ae9e3cffe7249ea Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 23 Dec 2022 15:44:27 +0100 Subject: ONAP PMS - new RICs must be locked before synch Bugfix, new RICs must also be locked before synch. Otherwise other activities may interfere. Improved the synch. Previously, all policies were removed from the NearRT-RIC and eventually recreated. After this fix, only unknwon policies are removed. Change-Id: Ic6224aeb93ef91579cfb8894329538baf1829283 Issue-ID: CCSDK-3827 Signed-off-by: PatrikBuhr --- .../clients/A1Client.java | 8 ++- .../clients/A1ClientFactory.java | 2 - .../clients/CcsdkA1AdapterClient.java | 11 ++-- .../clients/OscA1Client.java | 8 +-- .../clients/StdA1ClientVersion1.java | 4 +- .../clients/StdA1ClientVersion2.java | 9 ++-- .../repository/MultiMap.java | 9 ++++ .../repository/Policies.java | 6 ++- .../tasks/RefreshConfigTask.java | 30 ++++++----- .../tasks/RicSupervision.java | 2 - .../tasks/RicSynchronizationTask.java | 24 +++------ .../tasks/RefreshConfigTaskTest.java | 4 +- .../tasks/RicSynchronizationTaskTest.java | 62 +++++++--------------- .../utils/MockA1Client.java | 3 +- 14 files changed, 88 insertions(+), 94 deletions(-) (limited to 'a1-policy-management/src') diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java index a691ee1e..21636a38 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java @@ -20,7 +20,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; +import java.util.Collections; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -61,7 +63,11 @@ public interface A1Client { public Mono deletePolicy(Policy policy); - public Flux deleteAllPolicies(); + public Flux deleteAllPolicies(Set excludePolicyIds); + + default Flux deleteAllPolicies() { + return deleteAllPolicies(Collections.emptySet()); + } public Mono getPolicyStatus(Policy policy); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java index 75ba2515..45c0bc42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java @@ -30,7 +30,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import reactor.core.publisher.Mono; /** @@ -45,7 +44,6 @@ public class A1ClientFactory { private final AsyncRestClientFactory restClientFactory; - @Autowired public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) { this.appConfig = appConfig; this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java index df6faade..9c32d79c 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; import lombok.Getter; @@ -190,15 +191,16 @@ public class CcsdkA1AdapterClient implements A1Client { } @Override - public Flux deleteAllPolicies() { + public Flux deleteAllPolicies(Set excludePolicyIds) { if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); // } else { A1UriBuilder uriBuilder = this.getUriBuilder(); return getPolicyTypeIdentities() // .flatMapMany(Flux::fromIterable) // - .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC); + .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC); } } @@ -207,9 +209,10 @@ public class CcsdkA1AdapterClient implements A1Client { .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString); } - private Flux deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) { + private Flux deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set excludePolicyIds) { return getInstancesForType(uriBuilder, type) // - .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC); + .filter(policyId -> !excludePolicyIds.contains(policyId)) // + .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC); } @Override diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java index dfe33e7d..1d567a89 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -182,9 +183,9 @@ public class OscA1Client implements A1Client { } @Override - public Flux deleteAllPolicies() { + public Flux deleteAllPolicies(Set excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override @@ -209,8 +210,9 @@ public class OscA1Client implements A1Client { return restClient.delete(policyUri); } - private Flux deletePoliciesForType(String typeId) { + private Flux deletePoliciesForType(String typeId, Set excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java index 5eae7759..750008d8 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.util.Arrays; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -134,8 +135,9 @@ public class StdA1ClientVersion1 implements A1Client { } @Override - public Flux deleteAllPolicies() { + public Flux deleteAllPolicies(Set excludePolicyIds) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(this::deletePolicyById); // } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java index 0022057b..24990a1a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -193,16 +194,15 @@ public class StdA1ClientVersion2 implements A1Client { } @Override - public Flux deleteAllPolicies() { + public Flux deleteAllPolicies(Set excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override public Mono getPolicyStatus(Policy policy) { String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId()); return restClient.get(statusUri); - } private Flux getPolicyTypeIds() { @@ -220,8 +220,9 @@ public class StdA1ClientVersion2 implements A1Client { return restClient.delete(policyUri); } - private Flux deletePoliciesForType(String typeId) { + private Flux deleteAllPoliciesForType(String typeId, Set excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java index ff09ba3a..6f152870 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.Vector; /** @@ -58,6 +59,14 @@ public class MultiMap { return new Vector<>(innerMap.values()); } + public Set keySet(String key) { + Map innerMap = this.map.get(key); + if (innerMap == null) { + return Collections.emptySet(); + } + return innerMap.keySet(); + } + public void clear() { this.map.clear(); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index 6161fbd9..77ac0f4a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -71,11 +71,9 @@ public class Policies { private MultiMap policiesType = new MultiMap<>(); private final DataStore dataStore; - private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); public Policies(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; this.dataStore = DataStore.create(appConfig, "policies"); } @@ -139,6 +137,10 @@ public class Policies { return policiesRic.get(ric); } + public synchronized Set getPolicyIdsForRic(String ricId) { + return policiesRic.keySet(ricId); + } + public synchronized Collection getForType(String type) { return policiesType.get(type); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 408a0d5f..978ae1c0 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; @@ -43,7 +44,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -76,7 +76,7 @@ public class RefreshConfigTask { @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; - private final Rics rics; + final Rics rics; private final A1ClientFactory a1ClientFactory; private final Policies policies; private final Services services; @@ -85,7 +85,6 @@ public class RefreshConfigTask { private long fileLastModified = 0; - @Autowired public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics, Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory, SecurityContext securityContext) { @@ -161,7 +160,10 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - synchronizationTask().run(ric); + ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") // + .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) // + .doFinally(sig -> ric.getLock().unlockBlocking()) // + .subscribe(); } } @@ -176,9 +178,15 @@ public class RefreshConfigTask { if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); Ric ric = new Ric(updatedInfo.getRicConfig()); - this.addRic(ric); - return this.synchronizationTask().synchronizeRic(ric) // - .map(notUsed -> event); + + return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") // + .doOnNext(grant -> this.rics.put(ric)) // + .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) // + .collectList() // + .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) // + .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) // + .map(notUsed -> event) // + .doFinally(sig -> ric.getLock().unlockBlocking()); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -189,7 +197,7 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(new Ric(updatedInfo.getRicConfig())); + this.rics.put(new Ric(updatedInfo.getRicConfig())); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } @@ -198,12 +206,6 @@ public class RefreshConfigTask { } } - void addRic(Ric ric) { - this.rics.put(ric); - this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe(); - logger.debug("Added RIC: {}", ric.id()); - } - /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index fdeb47e2..1f612e42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -37,7 +37,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -86,7 +85,6 @@ public class RicSupervision { private final A1Client a1Client; } - @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services, ApplicationConfig config, SecurityContext securityContext) { this.rics = rics; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index b3afa7cd..d1bd433b 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -22,11 +22,12 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; +import java.util.Set; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; -import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -77,21 +78,11 @@ public class RicSynchronizationTask { this.rics = rics; } - public void run(Ric ric) { - logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId()); - - if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId()); - return; - } - - ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // - .flatMap(notUsed -> synchronizeRic(ric)) // - .doFinally(sig -> ric.getLock().unlockBlocking()) // - .subscribe(); - } - public Mono synchronizeRic(Ric ric) { + if (ric.getLock().getLockCounter() != 1) { + logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id()); + return Mono.empty(); + } return this.a1ClientFactory.createA1Client(ric) // .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // @@ -143,7 +134,8 @@ public class RicSynchronizationTask { private Flux runSynchronization(Ric ric, A1Client a1Client) { Flux synchronizedTypes = synchronizePolicyTypes(ric, a1Client); - Flux policiesDeletedInRic = a1Client.deleteAllPolicies(); + Set excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id()); + Flux policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete); Flux policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java index 2ecb9c28..45f2bcbc 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java @@ -81,7 +81,7 @@ class RefreshConfigTaskTest { .build(); private RefreshConfigTask createTestObject(boolean configFileExists) { - return createTestObject(configFileExists, new Rics(), new Policies(appConfig), true); + return createTestObject(configFileExists, spy(new Rics()), new Policies(appConfig), true); } private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies, @@ -116,7 +116,7 @@ class RefreshConfigTaskTest { // Then verify(refreshTaskUnderTest, atLeastOnce()).loadConfigurationFromFile(); - verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class)); + verify(refreshTaskUnderTest.rics, times(2)).put(any(Ric.class)); Iterable ricConfigs = appConfig.getRicConfigs(); RicConfig ricConfig = ricConfigs.iterator().next(); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java index 6386441c..ae4e92bc 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java @@ -23,11 +23,10 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -48,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -130,36 +130,15 @@ class RicSynchronizationTaskTest { rics); }; - @Test - void ricAlreadySynchronizing_thenNoSynchronization() { - ric1.setState(RicState.SYNCHRONIZING); - ric1.addSupportedPolicyType(POLICY_TYPE_1); - - policyTypes.put(POLICY_TYPE_1); - policies.put(policy1); - - RicSynchronizationTask synchronizerUnderTest = createTask(); - - synchronizerUnderTest.run(ric1); - - verifyNoInteractions(a1ClientMock); - - assertThat(policyTypes.size()).isEqualTo(1); - assertThat(policies.size()).isEqualTo(1); - assertThat(ric1.getState()).isEqualTo(RicState.SYNCHRONIZING); - assertThat(ric1.getSupportedPolicyTypeNames()).hasSize(1); - } - @Test void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() { setUpCreationOfA1Client(); simulateRicWithNoPolicyTypes(); policies.put(policy1); WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null); - when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception)); - RicSynchronizationTask synchronizerUnderTest = createTask(); + when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception)); ric1.setState(RicState.AVAILABLE); - synchronizerUnderTest.run(ric1); + runSynch(ric1); await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState())); assertThat(policies.size()).isZero(); assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE); @@ -172,10 +151,9 @@ class RicSynchronizationTaskTest { policies.put(policy1); WebClientRequestException exception = new WebClientRequestException(new ServiceException("x"), null, null, null); - when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception)); - RicSynchronizationTask synchronizerUnderTest = createTask(); + when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception)); ric1.setState(RicState.AVAILABLE); - synchronizerUnderTest.run(ric1); + runSynch(ric1); await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState())); } @@ -193,17 +171,13 @@ class RicSynchronizationTaskTest { setUpCreationOfA1Client(); simulateRicWithOnePolicyType(); - RicSynchronizationTask synchronizerUnderTest = spy(createTask()); - ric1.setState(RicState.UNAVAILABLE); - synchronizerUnderTest.run(ric1); + runSynch(ric1); await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState())); verify(a1ClientMock, times(1)).getPolicyTypeIdentities(); verifyNoMoreInteractions(a1ClientMock); - verify(synchronizerUnderTest).run(ric1); - assertThat(policyTypes.size()).isEqualTo(1); assertThat(policies.size()).isZero(); assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE); @@ -219,10 +193,8 @@ class RicSynchronizationTaskTest { String typeSchema = "schema"; when(a1ClientMock.getPolicyTypeSchema(POLICY_TYPE_1_NAME)).thenReturn(Mono.just(typeSchema)); - RicSynchronizationTask synchronizerUnderTest = createTask(); - ric1.setState(RicState.UNAVAILABLE); - synchronizerUnderTest.run(ric1); + runSynch(ric1); await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState())); verify(a1ClientMock).getPolicyTypeIdentities(); @@ -247,16 +219,14 @@ class RicSynchronizationTaskTest { setUpCreationOfA1Client(); simulateRicWithNoPolicyTypes(); - when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.just("OK")); + when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.just("OK")); when(a1ClientMock.putPolicy(any(Policy.class))).thenReturn(Mono.just("OK")); - RicSynchronizationTask synchronizerUnderTest = createTask(); - ric1.setState(RicState.UNAVAILABLE); - synchronizerUnderTest.run(ric1); + runSynch(ric1); await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState())); - verify(a1ClientMock).deleteAllPolicies(); + verify(a1ClientMock).deleteAllPolicies(anySet()); verify(a1ClientMock).putPolicy(policy1); verifyNoMoreInteractions(a1ClientMock); @@ -265,9 +235,17 @@ class RicSynchronizationTaskTest { assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE); } + private void runSynch(Ric ric) { + RicSynchronizationTask synchronizerUnderTest = createTask(); + ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // + .flatMap(notUsed -> synchronizerUnderTest.synchronizeRic(ric)) // + .doFinally(sig -> ric.getLock().unlockBlocking()) // + .block(); + } + private void setUpCreationOfA1Client() { when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock)); - doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(); + doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(anySet()); } private void simulateRicWithOnePolicyType() { diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java index b76f1e72..80bfff72 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; +import java.util.Set; import java.util.Vector; import lombok.Setter; @@ -112,7 +113,7 @@ public class MockA1Client implements A1Client { } @Override - public Flux deleteAllPolicies() { + public Flux deleteAllPolicies(Set excludePolicyId) { this.policies.clear(); return mono("OK") // .flatMapMany(Flux::just); -- cgit 1.2.3-korg