diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2022-05-30 14:22:46 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2022-06-07 09:54:09 +0200 |
commit | 3f3a4d71e80ab134af52489e519f88be9786c860 (patch) | |
tree | d4bc77f7d252b06c32935bc9bc182868a88322fd | |
parent | feed7a7eb0405135c014713887536595651ab83a (diff) |
NONRTRIC PMS, Sporadic instability
Attempt to stablize the synch.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: Ie0f266e2eef23e91dcf6f5925a577bb930b6d9e8
7 files changed, 59 insertions, 101 deletions
diff --git a/a1-policy-management/config/application.yaml b/a1-policy-management/config/application.yaml index 7628624d..f0f53375 100644 --- a/a1-policy-management/config/application.yaml +++ b/a1-policy-management/config/application.yaml @@ -56,6 +56,8 @@ server: key-store: /opt/app/policy-agent/etc/cert/keystore.jks key-password: policy_agent key-alias: policy_agent + # trust-store-password: + # trust-store: app: # Location of the component configuration file. filepath: /opt/app/policy-agent/data/application_configuration.json @@ -80,4 +82,3 @@ app: # A file containing an authorization token, which shall be inserted in each HTTP header (authorization). # If the file name is empty, no authorization token is sent. auth-token-file: - diff --git a/a1-policy-management/pom.xml b/a1-policy-management/pom.xml index 8808771f..04d65ce5 100644 --- a/a1-policy-management/pom.xml +++ b/a1-policy-management/pom.xml @@ -35,7 +35,7 @@ <java.version.source>11</java.version.source> <java.version.target>11</java.version.target> <springfox.version>3.0.0</springfox.version> - <immutable.version>2.9.0</immutable.version> + <gson.version>2.9.0</gson.version> <json.version>20220320</json.version> <formatter-maven-plugin.version>2.13.0</formatter-maven-plugin.version> <spotless-maven-plugin.version>2.5.0</spotless-maven-plugin.version> @@ -122,15 +122,9 @@ <artifactId>guava</artifactId> </dependency> <dependency> - <groupId>org.immutables</groupId> - <artifactId>value</artifactId> - <version>${immutable.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.immutables</groupId> + <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>${immutable.version}</version> + <version>${gson.version}</version> </dependency> <dependency> <groupId>org.json</groupId> @@ -430,4 +424,4 @@ </plugin> </plugins> </build> -</project> +</project>
\ No newline at end of file diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java index cb4e2ebb..6216a4df 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java @@ -34,6 +34,7 @@ import lombok.Getter; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,9 +133,9 @@ public class CcsdkA1AdapterClient implements A1Client { this.controllerConfig = controllerConfig; logger.debug("CcsdkA1AdapterClient for ric: {}, a1Controller: {}", ricConfig.getRicId(), controllerConfig); } else { + logger.error("Not supported protocoltype: {}", protocolType); throw new IllegalArgumentException("Not handeled protocolversion: " + protocolType); } - } @Override @@ -146,7 +147,6 @@ public class CcsdkA1AdapterClient implements A1Client { .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString) // .collectList(); } - } @Override @@ -173,7 +173,7 @@ public class CcsdkA1AdapterClient implements A1Client { } else if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V2_0_0) { return StdA1ClientVersion2.extractPolicySchema(controllerResponse, policyTypeId); } else { - throw new NullPointerException("Not supported"); + return Mono.error(new ServiceException("Not supported " + this.protocolType)); } } @@ -234,6 +234,7 @@ public class CcsdkA1AdapterClient implements A1Client { } else if (protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_OSC_V1) { return new OscA1Client.UriBuilder(ricConfig); } + logger.error("Not supported protocoltype: {}", protocolType); throw new NullPointerException(); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index e4711c98..f90d462e 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -114,11 +114,12 @@ public class RicSupervision { private Mono<RicData> checkOneRic(RicData ricData) { if (ricData.ric.getState() == RicState.CONSISTENCY_CHECK || ricData.ric.getState() == RicState.SYNCHRONIZING) { + logger.debug("Skipping check ric: {}, state: {}", ricData.ric.id(), ricData.ric.getState()); return Mono.empty(); // Skip, already in progress } return ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic") // .flatMap(lock -> synchIfUnavailable(ricData)) // - .doOnNext(ric -> ric.ric.setState(RicState.CONSISTENCY_CHECK)) // + .doOnNext(ric -> ricData.ric.setState(RicState.CONSISTENCY_CHECK)) // .flatMap(x -> checkRicPolicies(ricData)) // .flatMap(x -> checkRicPolicyTypes(ricData)) // .doOnNext(x -> onRicCheckedOk(ricData)) // @@ -127,6 +128,14 @@ public class RicSupervision { .onErrorResume(throwable -> Mono.empty()); } + private Mono<RicData> synchIfUnavailable(RicData ric) { + if (ric.ric.getState() == RicState.UNAVAILABLE) { + return Mono.error(new SynchNeededException(ric)); + } else { + return Mono.just(ric); + } + } + private Mono<RicData> onRicCheckedError(Throwable t, RicData ricData) { logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage()); ricData.ric.setState(RicState.UNAVAILABLE); @@ -149,14 +158,6 @@ public class RicSupervision { .map(a1Client -> new RicData(ric, a1Client)); } - private Mono<RicData> synchIfUnavailable(RicData ric) { - if (ric.ric.getState() == RicState.UNAVAILABLE) { - return Mono.error(new SynchNeededException(ric)); - } else { - return Mono.just(ric); - } - } - private Mono<RicData> checkRicPolicies(RicData ric) { return ric.getClient().getPolicyIdentities() // .flatMap(ricP -> validateInstances(ricP, ric)); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 94e873ed..2d282f33 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -93,17 +94,36 @@ public class RicSynchronizationTask { return this.a1ClientFactory.createA1Client(ric) // .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) // .flatMapMany(client -> runSynchronization(ric, client)) // - .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // - .collectList() // - .map(notUsed -> ric) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // ric.setState(RicState.UNAVAILABLE); // + deletePoliciesIfNotRecreatable(t, ric); }) // + .collectList() // .flatMap(notUsed -> onSynchronizationComplete(ric)) // .onErrorResume(t -> Mono.just(ric)); } + /** + * If a 4xx error is received, allpolicies are deleted. This is just to avoid + * cyclical receovery due to that the NearRT RIC cannot accept a previously + * policy. + */ + private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) { + if (throwable instanceof WebClientResponseException) { + WebClientResponseException responseException = (WebClientResponseException) throwable; + if (responseException.getStatusCode().is4xxClientError()) { + deleteAllPoliciesInRepository(ric); + } + } + } + + private void deleteAllPoliciesInRepository(Ric ric) { + for (Policy policy : policies.getForRic(ric.id())) { + this.policies.remove(policy); + } + } + public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { return a1Client.getPolicyTypeIdentities() // .doOnNext(x -> ric.clearSupportedPolicyTypes()) // @@ -134,19 +154,6 @@ public class RicSynchronizationTask { .map(list -> ric); } - private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage()); - deleteAllPoliciesInRepository(ric); - - Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client)); - Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(A1Client::deleteAllPolicies) // - .doOnComplete(() -> deleteAllPoliciesInRepository(ric)); - - return Flux.concat(synchronizedTypes, deletePoliciesInRic); - } - private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); @@ -161,12 +168,6 @@ public class RicSynchronizationTask { return pt; } - private void deleteAllPoliciesInRepository(Ric ric) { - for (Policy policy : policies.getForRic(ric.id())) { - this.policies.remove(policy); - } - } - private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) { logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId()); return a1Client.putPolicy(policy) // diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java index 875698a8..396a4063 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java @@ -157,14 +157,10 @@ class RicSupervisionTest { doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.UNAVAILABLE); rics.put(RIC_1); - RicSupervision supervisorUnderTest = spy(createRicSupervision()); - doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask(); doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any()); - supervisorUnderTest.checkAllRics(); - verify(supervisorUnderTest).checkAllRics(); verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java index 2e5424db..0ea0a8cb 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java @@ -56,6 +56,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -148,6 +149,21 @@ class RicSynchronizationTaskTest { } @Test + void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() { + setUpCreationOfA1Client(); + simulateRicWithNoPolicyTypes(); + policies.put(policy1); + WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null); + when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception)); + RicSynchronizationTask synchronizerUnderTest = createTask(); + ric1.setState(RicState.AVAILABLE); + synchronizerUnderTest.run(ric1); + await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState())); + assertThat(policies.size()).isZero(); + assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE); + } + + @Test void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() { rics.put(ric1); ric1.setState(RicState.AVAILABLE); @@ -233,58 +249,6 @@ class RicSynchronizationTaskTest { assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE); } - @Test - void ricIdleAndErrorDeletingPoliciesFirstTime_thenSynchronizationWithDeletionOfPolicies() { - ric1.setState(RicState.AVAILABLE); - rics.put(ric1); - - policies.put(policy1); - - setUpCreationOfA1Client(); - simulateRicWithNoPolicyTypes(); - - when(a1ClientMock.deleteAllPolicies()) // - .thenReturn(Flux.error(new Exception("Exception"))) // - .thenReturn(Flux.just("OK")); - - RicSynchronizationTask synchronizerUnderTest = createTask(); - - ric1.setState(RicState.UNAVAILABLE); - synchronizerUnderTest.run(ric1); - await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState())); - - verify(a1ClientMock, times(2)).deleteAllPolicies(); - verifyNoMoreInteractions(a1ClientMock); - - assertThat(policyTypes.size()).isZero(); - assertThat(policies.size()).isZero(); - assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE); - } - - @Test - void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() { - setUpCreationOfA1Client(); - simulateRicWithNoPolicyTypes(); - - policies.put(policy1); - - String originalErrorMessage = "Exception"; - when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(new Exception(originalErrorMessage))); - - RicSynchronizationTask synchronizerUnderTest = createTask(); - - ric1.setState(RicState.AVAILABLE); - synchronizerUnderTest.run(ric1); - await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState())); - - verify(a1ClientMock, times(2)).deleteAllPolicies(); - verifyNoMoreInteractions(a1ClientMock); - - assertThat(policyTypes.size()).isZero(); - assertThat(policies.size()).isZero(); - assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE); - } - private void setUpCreationOfA1Client() { when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock)); doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(); |