summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-05-30 14:22:46 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2022-06-07 09:54:09 +0200
commit3f3a4d71e80ab134af52489e519f88be9786c860 (patch)
treed4bc77f7d252b06c32935bc9bc182868a88322fd /a1-policy-management/src
parentfeed7a7eb0405135c014713887536595651ab83a (diff)
NONRTRIC PMS, Sporadic instability
Attempt to stablize the synch. Issue-ID: CCSDK-3683 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech> Change-Id: Ie0f266e2eef23e91dcf6f5925a577bb930b6d9e8
Diffstat (limited to 'a1-policy-management/src')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java7
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java19
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java45
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java4
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java68
5 files changed, 53 insertions, 90 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
index cb4e2ebb..6216a4df 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
@@ -34,6 +34,7 @@ import lombok.Getter;
import org.json.JSONObject;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +133,9 @@ public class CcsdkA1AdapterClient implements A1Client {
this.controllerConfig = controllerConfig;
logger.debug("CcsdkA1AdapterClient for ric: {}, a1Controller: {}", ricConfig.getRicId(), controllerConfig);
} else {
+ logger.error("Not supported protocoltype: {}", protocolType);
throw new IllegalArgumentException("Not handeled protocolversion: " + protocolType);
}
-
}
@Override
@@ -146,7 +147,6 @@ public class CcsdkA1AdapterClient implements A1Client {
.flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString) //
.collectList();
}
-
}
@Override
@@ -173,7 +173,7 @@ public class CcsdkA1AdapterClient implements A1Client {
} else if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V2_0_0) {
return StdA1ClientVersion2.extractPolicySchema(controllerResponse, policyTypeId);
} else {
- throw new NullPointerException("Not supported");
+ return Mono.error(new ServiceException("Not supported " + this.protocolType));
}
}
@@ -234,6 +234,7 @@ public class CcsdkA1AdapterClient implements A1Client {
} else if (protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_OSC_V1) {
return new OscA1Client.UriBuilder(ricConfig);
}
+ logger.error("Not supported protocoltype: {}", protocolType);
throw new NullPointerException();
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index e4711c98..f90d462e 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -114,11 +114,12 @@ public class RicSupervision {
private Mono<RicData> checkOneRic(RicData ricData) {
if (ricData.ric.getState() == RicState.CONSISTENCY_CHECK || ricData.ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Skipping check ric: {}, state: {}", ricData.ric.id(), ricData.ric.getState());
return Mono.empty(); // Skip, already in progress
}
return ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic") //
.flatMap(lock -> synchIfUnavailable(ricData)) //
- .doOnNext(ric -> ric.ric.setState(RicState.CONSISTENCY_CHECK)) //
+ .doOnNext(ric -> ricData.ric.setState(RicState.CONSISTENCY_CHECK)) //
.flatMap(x -> checkRicPolicies(ricData)) //
.flatMap(x -> checkRicPolicyTypes(ricData)) //
.doOnNext(x -> onRicCheckedOk(ricData)) //
@@ -127,6 +128,14 @@ public class RicSupervision {
.onErrorResume(throwable -> Mono.empty());
}
+ private Mono<RicData> synchIfUnavailable(RicData ric) {
+ if (ric.ric.getState() == RicState.UNAVAILABLE) {
+ return Mono.error(new SynchNeededException(ric));
+ } else {
+ return Mono.just(ric);
+ }
+ }
+
private Mono<RicData> onRicCheckedError(Throwable t, RicData ricData) {
logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
ricData.ric.setState(RicState.UNAVAILABLE);
@@ -149,14 +158,6 @@ public class RicSupervision {
.map(a1Client -> new RicData(ric, a1Client));
}
- private Mono<RicData> synchIfUnavailable(RicData ric) {
- if (ric.ric.getState() == RicState.UNAVAILABLE) {
- return Mono.error(new SynchNeededException(ric));
- } else {
- return Mono.just(ric);
- }
- }
-
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.getClient().getPolicyIdentities() //
.flatMap(ricP -> validateInstances(ricP, ric));
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 94e873ed..2d282f33 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -93,17 +94,36 @@ public class RicSynchronizationTask {
return this.a1ClientFactory.createA1Client(ric) //
.doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
- .collectList() //
- .map(notUsed -> ric) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
ric.setState(RicState.UNAVAILABLE); //
+ deletePoliciesIfNotRecreatable(t, ric);
}) //
+ .collectList() //
.flatMap(notUsed -> onSynchronizationComplete(ric)) //
.onErrorResume(t -> Mono.just(ric));
}
+ /**
+ * If a 4xx error is received, allpolicies are deleted. This is just to avoid
+ * cyclical receovery due to that the NearRT RIC cannot accept a previously
+ * policy.
+ */
+ private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) {
+ if (throwable instanceof WebClientResponseException) {
+ WebClientResponseException responseException = (WebClientResponseException) throwable;
+ if (responseException.getStatusCode().is4xxClientError()) {
+ deleteAllPoliciesInRepository(ric);
+ }
+ }
+ }
+
+ private void deleteAllPoliciesInRepository(Ric ric) {
+ for (Policy policy : policies.getForRic(ric.id())) {
+ this.policies.remove(policy);
+ }
+ }
+
public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
return a1Client.getPolicyTypeIdentities() //
.doOnNext(x -> ric.clearSupportedPolicyTypes()) //
@@ -134,19 +154,6 @@ public class RicSynchronizationTask {
.map(list -> ric);
}
- private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
- deleteAllPoliciesInRepository(ric);
-
- Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
- Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(A1Client::deleteAllPolicies) //
- .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
-
- return Flux.concat(synchronizedTypes, deletePoliciesInRic);
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
@@ -161,12 +168,6 @@ public class RicSynchronizationTask {
return pt;
}
- private void deleteAllPoliciesInRepository(Ric ric) {
- for (Policy policy : policies.getForRic(ric.id())) {
- this.policies.remove(policy);
- }
- }
-
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId());
return a1Client.putPolicy(policy) //
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
index 875698a8..396a4063 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
@@ -157,14 +157,10 @@ class RicSupervisionTest {
doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.UNAVAILABLE);
rics.put(RIC_1);
-
RicSupervision supervisorUnderTest = spy(createRicSupervision());
-
doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
-
supervisorUnderTest.checkAllRics();
-
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
index 2e5424db..0ea0a8cb 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
@@ -56,6 +56,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -148,6 +149,21 @@ class RicSynchronizationTaskTest {
}
@Test
+ void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
+ setUpCreationOfA1Client();
+ simulateRicWithNoPolicyTypes();
+ policies.put(policy1);
+ WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null);
+ when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
+ RicSynchronizationTask synchronizerUnderTest = createTask();
+ ric1.setState(RicState.AVAILABLE);
+ synchronizerUnderTest.run(ric1);
+ await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
+ assertThat(policies.size()).isZero();
+ assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
+ }
+
+ @Test
void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
rics.put(ric1);
ric1.setState(RicState.AVAILABLE);
@@ -233,58 +249,6 @@ class RicSynchronizationTaskTest {
assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
}
- @Test
- void ricIdleAndErrorDeletingPoliciesFirstTime_thenSynchronizationWithDeletionOfPolicies() {
- ric1.setState(RicState.AVAILABLE);
- rics.put(ric1);
-
- policies.put(policy1);
-
- setUpCreationOfA1Client();
- simulateRicWithNoPolicyTypes();
-
- when(a1ClientMock.deleteAllPolicies()) //
- .thenReturn(Flux.error(new Exception("Exception"))) //
- .thenReturn(Flux.just("OK"));
-
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
- ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
- await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
-
- verify(a1ClientMock, times(2)).deleteAllPolicies();
- verifyNoMoreInteractions(a1ClientMock);
-
- assertThat(policyTypes.size()).isZero();
- assertThat(policies.size()).isZero();
- assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
- }
-
- @Test
- void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
- setUpCreationOfA1Client();
- simulateRicWithNoPolicyTypes();
-
- policies.put(policy1);
-
- String originalErrorMessage = "Exception";
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(new Exception(originalErrorMessage)));
-
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
- ric1.setState(RicState.AVAILABLE);
- synchronizerUnderTest.run(ric1);
- await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
-
- verify(a1ClientMock, times(2)).deleteAllPolicies();
- verifyNoMoreInteractions(a1ClientMock);
-
- assertThat(policyTypes.size()).isZero();
- assertThat(policies.size()).isZero();
- assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
- }
-
private void setUpCreationOfA1Client() {
when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();