diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2022-12-23 15:44:27 +0100 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2022-12-28 10:47:11 +0100 |
commit | 123ba96ee12d73d89aac9e274ae9e3cffe7249ea (patch) | |
tree | b00fcb23885d38cdcf08b78fabaf005e953ae730 /a1-policy-management/src/main/java/org/onap | |
parent | c7f757e98066775ed2fdeb67f3d31777e8430624 (diff) |
ONAP PMS - new RICs must be locked before synch
Bugfix, new RICs must also be locked before synch. Otherwise other activities may interfere.
Improved the synch. Previously, all policies were removed from the NearRT-RIC and eventually recreated.
After this fix, only unknwon policies are removed.
Change-Id: Ic6224aeb93ef91579cfb8894329538baf1829283
Issue-ID: CCSDK-3827
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap')
11 files changed, 64 insertions, 49 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java index a691ee1e..21636a38 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java @@ -20,7 +20,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; +import java.util.Collections; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -61,7 +63,11 @@ public interface A1Client { public Mono<String> deletePolicy(Policy policy); - public Flux<String> deleteAllPolicies(); + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds); + + default Flux<String> deleteAllPolicies() { + return deleteAllPolicies(Collections.emptySet()); + } public Mono<String> getPolicyStatus(Policy policy); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java index 75ba2515..45c0bc42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java @@ -30,7 +30,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import reactor.core.publisher.Mono; /** @@ -45,7 +44,6 @@ public class A1ClientFactory { private final AsyncRestClientFactory restClientFactory; - @Autowired public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) { this.appConfig = appConfig; this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java index df6faade..9c32d79c 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; import lombok.Getter; @@ -190,15 +191,16 @@ public class CcsdkA1AdapterClient implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); // } else { A1UriBuilder uriBuilder = this.getUriBuilder(); return getPolicyTypeIdentities() // .flatMapMany(Flux::fromIterable) // - .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC); + .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC); } } @@ -207,9 +209,10 @@ public class CcsdkA1AdapterClient implements A1Client { .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString); } - private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) { + private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set<String> excludePolicyIds) { return getInstancesForType(uriBuilder, type) // - .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC); + .filter(policyId -> !excludePolicyIds.contains(policyId)) // + .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC); } @Override diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java index dfe33e7d..1d567a89 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -182,9 +183,9 @@ public class OscA1Client implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override @@ -209,8 +210,9 @@ public class OscA1Client implements A1Client { return restClient.delete(policyUri); } - private Flux<String> deletePoliciesForType(String typeId) { + private Flux<String> deletePoliciesForType(String typeId, Set<String> excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java index 5eae7759..750008d8 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.util.Arrays; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -134,8 +135,9 @@ public class StdA1ClientVersion1 implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(this::deletePolicyById); // } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java index 0022057b..24990a1a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -193,16 +194,15 @@ public class StdA1ClientVersion2 implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override public Mono<String> getPolicyStatus(Policy policy) { String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId()); return restClient.get(statusUri); - } private Flux<String> getPolicyTypeIds() { @@ -220,8 +220,9 @@ public class StdA1ClientVersion2 implements A1Client { return restClient.delete(policyUri); } - private Flux<String> deletePoliciesForType(String typeId) { + private Flux<String> deleteAllPoliciesForType(String typeId, Set<String> excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java index ff09ba3a..6f152870 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.Vector; /** @@ -58,6 +59,14 @@ public class MultiMap<T> { return new Vector<>(innerMap.values()); } + public Set<String> keySet(String key) { + Map<String, T> innerMap = this.map.get(key); + if (innerMap == null) { + return Collections.emptySet(); + } + return innerMap.keySet(); + } + public void clear() { this.map.clear(); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index 6161fbd9..77ac0f4a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -71,11 +71,9 @@ public class Policies { private MultiMap<Policy> policiesType = new MultiMap<>(); private final DataStore dataStore; - private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); public Policies(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; this.dataStore = DataStore.create(appConfig, "policies"); } @@ -139,6 +137,10 @@ public class Policies { return policiesRic.get(ric); } + public synchronized Set<String> getPolicyIdsForRic(String ricId) { + return policiesRic.keySet(ricId); + } + public synchronized Collection<Policy> getForType(String type) { return policiesType.get(type); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 408a0d5f..978ae1c0 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; @@ -43,7 +44,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -76,7 +76,7 @@ public class RefreshConfigTask { @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; - private final Rics rics; + final Rics rics; private final A1ClientFactory a1ClientFactory; private final Policies policies; private final Services services; @@ -85,7 +85,6 @@ public class RefreshConfigTask { private long fileLastModified = 0; - @Autowired public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics, Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory, SecurityContext securityContext) { @@ -161,7 +160,10 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - synchronizationTask().run(ric); + ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") // + .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) // + .doFinally(sig -> ric.getLock().unlockBlocking()) // + .subscribe(); } } @@ -176,9 +178,15 @@ public class RefreshConfigTask { if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); Ric ric = new Ric(updatedInfo.getRicConfig()); - this.addRic(ric); - return this.synchronizationTask().synchronizeRic(ric) // - .map(notUsed -> event); + + return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") // + .doOnNext(grant -> this.rics.put(ric)) // + .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) // + .collectList() // + .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) // + .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) // + .map(notUsed -> event) // + .doFinally(sig -> ric.getLock().unlockBlocking()); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -189,7 +197,7 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(new Ric(updatedInfo.getRicConfig())); + this.rics.put(new Ric(updatedInfo.getRicConfig())); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } @@ -198,12 +206,6 @@ public class RefreshConfigTask { } } - void addRic(Ric ric) { - this.rics.put(ric); - this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe(); - logger.debug("Added RIC: {}", ric.id()); - } - /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index fdeb47e2..1f612e42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -37,7 +37,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -86,7 +85,6 @@ public class RicSupervision { private final A1Client a1Client; } - @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services, ApplicationConfig config, SecurityContext securityContext) { this.rics = rics; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index b3afa7cd..d1bd433b 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -22,11 +22,12 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; +import java.util.Set; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; -import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -77,21 +78,11 @@ public class RicSynchronizationTask { this.rics = rics; } - public void run(Ric ric) { - logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId()); - - if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId()); - return; - } - - ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // - .flatMap(notUsed -> synchronizeRic(ric)) // - .doFinally(sig -> ric.getLock().unlockBlocking()) // - .subscribe(); - } - public Mono<Ric> synchronizeRic(Ric ric) { + if (ric.getLock().getLockCounter() != 1) { + logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id()); + return Mono.empty(); + } return this.a1ClientFactory.createA1Client(ric) // .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // @@ -143,7 +134,8 @@ public class RicSynchronizationTask { private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) { Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client); - Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(); + Set<String> excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id()); + Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete); Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic); |