diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2022-06-10 12:43:06 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2022-06-10 13:11:04 +0200 |
commit | 87b1ff527a8110f592b4b5e2fcf13b8cd43b6d1f (patch) | |
tree | f375243d253097b84250550b13ee7558936087e9 /a1-policy-management | |
parent | 779509036cdadb5735d336ed93a3bfac5a0be72a (diff) |
NONRTRIC PMS, Sporadic instability
Some further simplifications and added test.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
Diffstat (limited to 'a1-policy-management')
6 files changed, 47 insertions, 68 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index fea242ae..983e92e6 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -36,11 +36,9 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; -import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; -import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.slf4j.Logger; @@ -130,7 +128,7 @@ public class RefreshConfigTask { .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig, CONCURRENCY) // .flatMap(this::handleUpdatedRicConfig) // - .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); + .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal)); } private Flux<Long> regularInterval() { @@ -170,40 +168,16 @@ public class RefreshConfigTask { return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics); } - /** - * for an added RIC after a restart it is nesessary to get the suypported policy - * types from the RIC unless a full synchronization is wanted. - * - * @param ric the ric to get supprted types from - * @return the same ric - */ - private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) { - logger.debug("Synchronizing policy types for new RIC: {}", ric.id()); - // Synchronize the policy types - ric.setState(RicState.SYNCHRONIZING); - return this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // - .collectList() // - .map(list -> ric) // - .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) // - .doOnError(t -> { - logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage()); - ric.setState(RicState.UNAVAILABLE); // - }) // - .onErrorResume(t -> Mono.just(ric)); - } - public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { synchronized (this.rics) { String ricId = updatedInfo.getRicConfig().getRicId(); RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - - return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) // - .doOnNext(this::addRic) // - .flatMap(this::notifyServicesRicAvailable) // - .flatMap(notUsed -> Mono.just(event)); + Ric ric = new Ric(updatedInfo.getRicConfig()); + this.addRic(ric); + return this.synchronizationTask().synchronizeRic(ric) // + .map(notUsed -> event); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); Ric ric = rics.remove(ricId); @@ -231,17 +205,6 @@ public class RefreshConfigTask { logger.debug("Added RIC: {}", ric.id()); } - private Mono<Ric> notifyServicesRicAvailable(Ric ric) { - if (ric.getState() == RicState.AVAILABLE) { - ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); - return callbacks.notifyServicesRicAvailable(ric, services) // - .collectList() // - .map(list -> ric); - } else { - return Mono.just(ric); - } - } - /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 177778b8..e3edaf44 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -106,11 +106,12 @@ public class RicSupervision { createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed")); } - private Flux<RicData> createTask() { + private Flux<Ric> createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // .onErrorResume(t -> Flux.empty()) // - .flatMap(this::checkOneRic, CONCURRENCY); + .flatMap(this::checkOneRic, CONCURRENCY) // + .map(ricData -> ricData.ric); } private Mono<RicData> checkOneRic(RicData ricData) { diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java index dcb03a5a..c689097d 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java @@ -920,11 +920,16 @@ class ApplicationTest { final Instant startTime = Instant.now(); List<Thread> threads = new ArrayList<>(); List<ConcurrencyTestRunnable> tests = new ArrayList<>(); - a1ClientFactory.setResponseDelay(Duration.ofMillis(1)); + a1ClientFactory.setResponseDelay(Duration.ofMillis(2)); addRic("ric"); addPolicyType("type1", "ric"); addPolicyType("type2", "ric"); + final String NON_RESPONDING_RIC = "NonRespondingRic"; + Ric nonRespondingRic = addRic(NON_RESPONDING_RIC); + MockA1Client a1Client = a1ClientFactory.getOrCreateA1Client(NON_RESPONDING_RIC); + a1Client.setErrorInject("errorInject"); + for (int i = 0; i < 10; ++i) { AsyncRestClient restClient = restClient(); ConcurrencyTestRunnable test = @@ -942,6 +947,9 @@ class ApplicationTest { } assertThat(policies.size()).isZero(); logger.info("Concurrency test took " + Duration.between(startTime, Instant.now())); + + assertThat(nonRespondingRic.getState()).isEqualTo(RicState.UNAVAILABLE); + nonRespondingRic.setState(RicState.AVAILABLE); } private AsyncRestClient restClient(String baseUrl, boolean useTrustValidation) { diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java index 203cc3e5..2ecb9c28 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java @@ -87,6 +87,7 @@ class RefreshConfigTaskTest { private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies, boolean stubConfigFileExists) { SecurityContext secContext = new SecurityContext(""); + RefreshConfigTask obj = spy(new RefreshConfigTask(configurationFileMock, appConfig, rics, policies, new Services(appConfig), new PolicyTypes(appConfig), new A1ClientFactory(appConfig, secContext), secContext)); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java index 396a4063..b2bf58e4 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java @@ -24,8 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.time.Instant; @@ -148,7 +148,7 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verifyNoMoreInteractions(supervisorUnderTest); + verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1); assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE); } @@ -161,10 +161,8 @@ class RicSupervisionTest { doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask(); doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any()); supervisorUnderTest.checkAllRics(); - verify(supervisorUnderTest).checkAllRics(); - verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); - verifyNoMoreInteractions(supervisorUnderTest); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @@ -179,7 +177,7 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verifyNoMoreInteractions(supervisorUnderTest); + verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1); assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING); } @@ -196,7 +194,8 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verifyNoMoreInteractions(supervisorUnderTest); + verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @@ -218,9 +217,8 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); - verifyNoMoreInteractions(supervisorUnderTest); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @@ -242,11 +240,9 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); - verifyNoMoreInteractions(supervisorUnderTest); - assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @Test @@ -263,7 +259,8 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verifyNoMoreInteractions(supervisorUnderTest); + verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @@ -286,9 +283,8 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); - verifyNoMoreInteractions(supervisorUnderTest); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @@ -315,9 +311,7 @@ class RicSupervisionTest { supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); - verify(supervisorUnderTest).createSynchronizationTask(); verify(synchronizationTaskMock).synchronizeRic(RIC_1); - verifyNoMoreInteractions(supervisorUnderTest); assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java index 2a3b28ef..b76f1e72 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java @@ -28,6 +28,8 @@ import java.time.Duration; import java.util.List; import java.util.Vector; +import lombok.Setter; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; @@ -44,7 +46,12 @@ import reactor.core.publisher.MonoSink; public class MockA1Client implements A1Client { Policies policies; private final PolicyTypes policyTypes; - private final Duration asynchDelay; + + @Setter + private Duration asynchDelay; + + @Setter + private String errorInject; public MockA1Client(String ricId, ApplicationConfig appConfig, PolicyTypes policyTypes, Duration asynchDelay) { this.policyTypes = policyTypes; @@ -117,14 +124,19 @@ public class MockA1Client implements A1Client { } private <T> Mono<T> mono(T value) { - if (this.asynchDelay.isZero()) { - return Mono.just(value); - } else { - return Mono.create(monoSink -> asynchResponse(monoSink, value)); + Mono<T> res = Mono.just(value); + if (!this.asynchDelay.isZero()) { + res = Mono.create(monoSink -> asynchResponse(monoSink, value)); } + + if (this.errorInject != null) { + res = res.flatMap(x -> monoError(this.errorInject, HttpStatus.BAD_GATEWAY)); + } + + return res; } - public static Mono<String> monoError(String responseBody, HttpStatus status) { + public static <T> Mono<T> monoError(String responseBody, HttpStatus status) { byte[] responseBodyBytes = responseBody.getBytes(StandardCharsets.UTF_8); WebClientResponseException a1Exception = new WebClientResponseException(status.value(), status.getReasonPhrase(), null, responseBodyBytes, StandardCharsets.UTF_8, null); |