diff options
Diffstat (limited to 'a1-policy-management/src/main')
11 files changed, 64 insertions, 49 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java index a691ee1e..21636a38 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java @@ -20,7 +20,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; +import java.util.Collections; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -61,7 +63,11 @@ public interface A1Client { public Mono<String> deletePolicy(Policy policy); - public Flux<String> deleteAllPolicies(); + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds); + + default Flux<String> deleteAllPolicies() { + return deleteAllPolicies(Collections.emptySet()); + } public Mono<String> getPolicyStatus(Policy policy); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java index 75ba2515..45c0bc42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java @@ -30,7 +30,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import reactor.core.publisher.Mono; /** @@ -45,7 +44,6 @@ public class A1ClientFactory { private final AsyncRestClientFactory restClientFactory; - @Autowired public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) { this.appConfig = appConfig; this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java index df6faade..9c32d79c 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; import lombok.Getter; @@ -190,15 +191,16 @@ public class CcsdkA1AdapterClient implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); // } else { A1UriBuilder uriBuilder = this.getUriBuilder(); return getPolicyTypeIdentities() // .flatMapMany(Flux::fromIterable) // - .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC); + .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC); } } @@ -207,9 +209,10 @@ public class CcsdkA1AdapterClient implements A1Client { .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString); } - private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) { + private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set<String> excludePolicyIds) { return getInstancesForType(uriBuilder, type) // - .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC); + .filter(policyId -> !excludePolicyIds.contains(policyId)) // + .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC); } @Override diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java index dfe33e7d..1d567a89 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -182,9 +183,9 @@ public class OscA1Client implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override @@ -209,8 +210,9 @@ public class OscA1Client implements A1Client { return restClient.delete(policyUri); } - private Flux<String> deletePoliciesForType(String typeId) { + private Flux<String> deletePoliciesForType(String typeId, Set<String> excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java index 5eae7759..750008d8 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.util.Arrays; import java.util.List; +import java.util.Set; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -134,8 +135,9 @@ public class StdA1ClientVersion1 implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyIds() // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(this::deletePolicyById); // } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java index 0022057b..24990a1a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; @@ -193,16 +194,15 @@ public class StdA1ClientVersion2 implements A1Client { } @Override - public Flux<String> deleteAllPolicies() { + public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); + .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC); } @Override public Mono<String> getPolicyStatus(Policy policy) { String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId()); return restClient.get(statusUri); - } private Flux<String> getPolicyTypeIds() { @@ -220,8 +220,9 @@ public class StdA1ClientVersion2 implements A1Client { return restClient.delete(policyUri); } - private Flux<String> deletePoliciesForType(String typeId) { + private Flux<String> deleteAllPoliciesForType(String typeId, Set<String> excludePolicyIds) { return getPolicyIdentitiesByType(typeId) // + .filter(policyId -> !excludePolicyIds.contains(policyId)) // .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java index ff09ba3a..6f152870 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.Vector; /** @@ -58,6 +59,14 @@ public class MultiMap<T> { return new Vector<>(innerMap.values()); } + public Set<String> keySet(String key) { + Map<String, T> innerMap = this.map.get(key); + if (innerMap == null) { + return Collections.emptySet(); + } + return innerMap.keySet(); + } + public void clear() { this.map.clear(); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index 6161fbd9..77ac0f4a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -71,11 +71,9 @@ public class Policies { private MultiMap<Policy> policiesType = new MultiMap<>(); private final DataStore dataStore; - private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); public Policies(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; this.dataStore = DataStore.create(appConfig, "policies"); } @@ -139,6 +137,10 @@ public class Policies { return policiesRic.get(ric); } + public synchronized Set<String> getPolicyIdsForRic(String ricId) { + return policiesRic.keySet(ricId); + } + public synchronized Collection<Policy> getForType(String type) { return policiesType.get(type); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 408a0d5f..978ae1c0 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; @@ -43,7 +44,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -76,7 +76,7 @@ public class RefreshConfigTask { @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; - private final Rics rics; + final Rics rics; private final A1ClientFactory a1ClientFactory; private final Policies policies; private final Services services; @@ -85,7 +85,6 @@ public class RefreshConfigTask { private long fileLastModified = 0; - @Autowired public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics, Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory, SecurityContext securityContext) { @@ -161,7 +160,10 @@ public class RefreshConfigTask { private void removePoliciciesInRic(@Nullable Ric ric) { if (ric != null) { - synchronizationTask().run(ric); + ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") // + .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) // + .doFinally(sig -> ric.getLock().unlockBlocking()) // + .subscribe(); } } @@ -176,9 +178,15 @@ public class RefreshConfigTask { if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); Ric ric = new Ric(updatedInfo.getRicConfig()); - this.addRic(ric); - return this.synchronizationTask().synchronizeRic(ric) // - .map(notUsed -> event); + + return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") // + .doOnNext(grant -> this.rics.put(ric)) // + .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) // + .collectList() // + .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) // + .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) // + .map(notUsed -> event) // + .doFinally(sig -> ric.getLock().unlockBlocking()); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -189,7 +197,7 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(new Ric(updatedInfo.getRicConfig())); + this.rics.put(new Ric(updatedInfo.getRicConfig())); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } @@ -198,12 +206,6 @@ public class RefreshConfigTask { } } - void addRic(Ric ric) { - this.rics.put(ric); - this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe(); - logger.debug("Added RIC: {}", ric.id()); - } - /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index fdeb47e2..1f612e42 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -37,7 +37,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -86,7 +85,6 @@ public class RicSupervision { private final A1Client a1Client; } - @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services, ApplicationConfig config, SecurityContext securityContext) { this.rics = rics; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index b3afa7cd..d1bd433b 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -22,11 +22,12 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; +import java.util.Set; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; -import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -77,21 +78,11 @@ public class RicSynchronizationTask { this.rics = rics; } - public void run(Ric ric) { - logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId()); - - if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId()); - return; - } - - ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // - .flatMap(notUsed -> synchronizeRic(ric)) // - .doFinally(sig -> ric.getLock().unlockBlocking()) // - .subscribe(); - } - public Mono<Ric> synchronizeRic(Ric ric) { + if (ric.getLock().getLockCounter() != 1) { + logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id()); + return Mono.empty(); + } return this.a1ClientFactory.createA1Client(ric) // .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // @@ -143,7 +134,8 @@ public class RicSynchronizationTask { private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) { Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client); - Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(); + Set<String> excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id()); + Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete); Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic); |