aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-06-10 12:43:06 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2022-06-10 13:11:04 +0200
commit87b1ff527a8110f592b4b5e2fcf13b8cd43b6d1f (patch)
treef375243d253097b84250550b13ee7558936087e9
parent779509036cdadb5735d336ed93a3bfac5a0be72a (diff)
NONRTRIC PMS, Sporadic instability
Some further simplifications and added test. Issue-ID: CCSDK-3683 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech> Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java47
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java5
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java10
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java1
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java28
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java24
6 files changed, 47 insertions, 68 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index fea242ae..983e92e6 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -36,11 +36,9 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
@@ -130,7 +128,7 @@ public class RefreshConfigTask {
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig, CONCURRENCY) //
.flatMap(this::handleUpdatedRicConfig) //
- .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+ .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal));
}
private Flux<Long> regularInterval() {
@@ -170,40 +168,16 @@ public class RefreshConfigTask {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
}
- /**
- * for an added RIC after a restart it is nesessary to get the suypported policy
- * types from the RIC unless a full synchronization is wanted.
- *
- * @param ric the ric to get supprted types from
- * @return the same ric
- */
- private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
- logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
- // Synchronize the policy types
- ric.setState(RicState.SYNCHRONIZING);
- return this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
- .collectList() //
- .map(list -> ric) //
- .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
- .doOnError(t -> {
- logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
- ric.setState(RicState.UNAVAILABLE); //
- }) //
- .onErrorResume(t -> Mono.just(ric));
- }
-
public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().getRicId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
-
- return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
- .doOnNext(this::addRic) //
- .flatMap(this::notifyServicesRicAvailable) //
- .flatMap(notUsed -> Mono.just(event));
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ this.addRic(ric);
+ return this.synchronizationTask().synchronizeRic(ric) //
+ .map(notUsed -> event);
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
@@ -231,17 +205,6 @@ public class RefreshConfigTask {
logger.debug("Added RIC: {}", ric.id());
}
- private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
- if (ric.getState() == RicState.AVAILABLE) {
- ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
- return callbacks.notifyServicesRicAvailable(ric, services) //
- .collectList() //
- .map(list -> ric);
- } else {
- return Mono.just(ric);
- }
- }
-
/**
* Reads the configuration from file.
*/
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index 177778b8..e3edaf44 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -106,11 +106,12 @@ public class RicSupervision {
createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
}
- private Flux<RicData> createTask() {
+ private Flux<Ric> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.onErrorResume(t -> Flux.empty()) //
- .flatMap(this::checkOneRic, CONCURRENCY);
+ .flatMap(this::checkOneRic, CONCURRENCY) //
+ .map(ricData -> ricData.ric);
}
private Mono<RicData> checkOneRic(RicData ricData) {
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
index dcb03a5a..c689097d 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
@@ -920,11 +920,16 @@ class ApplicationTest {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
List<ConcurrencyTestRunnable> tests = new ArrayList<>();
- a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+ a1ClientFactory.setResponseDelay(Duration.ofMillis(2));
addRic("ric");
addPolicyType("type1", "ric");
addPolicyType("type2", "ric");
+ final String NON_RESPONDING_RIC = "NonRespondingRic";
+ Ric nonRespondingRic = addRic(NON_RESPONDING_RIC);
+ MockA1Client a1Client = a1ClientFactory.getOrCreateA1Client(NON_RESPONDING_RIC);
+ a1Client.setErrorInject("errorInject");
+
for (int i = 0; i < 10; ++i) {
AsyncRestClient restClient = restClient();
ConcurrencyTestRunnable test =
@@ -942,6 +947,9 @@ class ApplicationTest {
}
assertThat(policies.size()).isZero();
logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
+
+ assertThat(nonRespondingRic.getState()).isEqualTo(RicState.UNAVAILABLE);
+ nonRespondingRic.setState(RicState.AVAILABLE);
}
private AsyncRestClient restClient(String baseUrl, boolean useTrustValidation) {
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
index 203cc3e5..2ecb9c28 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
@@ -87,6 +87,7 @@ class RefreshConfigTaskTest {
private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
boolean stubConfigFileExists) {
SecurityContext secContext = new SecurityContext("");
+
RefreshConfigTask obj =
spy(new RefreshConfigTask(configurationFileMock, appConfig, rics, policies, new Services(appConfig),
new PolicyTypes(appConfig), new A1ClientFactory(appConfig, secContext), secContext));
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
index 396a4063..b2bf58e4 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
@@ -24,8 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.time.Instant;
@@ -148,7 +148,7 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
@@ -161,10 +161,8 @@ class RicSupervisionTest {
doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
supervisorUnderTest.checkAllRics();
- verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@@ -179,7 +177,7 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING);
}
@@ -196,7 +194,8 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@@ -218,9 +217,8 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@@ -242,11 +240,9 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
- assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
+ assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@Test
@@ -263,7 +259,8 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@@ -286,9 +283,8 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@@ -315,9 +311,7 @@ class RicSupervisionTest {
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
index 2a3b28ef..b76f1e72 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java
@@ -28,6 +28,8 @@ import java.time.Duration;
import java.util.List;
import java.util.Vector;
+import lombok.Setter;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
@@ -44,7 +46,12 @@ import reactor.core.publisher.MonoSink;
public class MockA1Client implements A1Client {
Policies policies;
private final PolicyTypes policyTypes;
- private final Duration asynchDelay;
+
+ @Setter
+ private Duration asynchDelay;
+
+ @Setter
+ private String errorInject;
public MockA1Client(String ricId, ApplicationConfig appConfig, PolicyTypes policyTypes, Duration asynchDelay) {
this.policyTypes = policyTypes;
@@ -117,14 +124,19 @@ public class MockA1Client implements A1Client {
}
private <T> Mono<T> mono(T value) {
- if (this.asynchDelay.isZero()) {
- return Mono.just(value);
- } else {
- return Mono.create(monoSink -> asynchResponse(monoSink, value));
+ Mono<T> res = Mono.just(value);
+ if (!this.asynchDelay.isZero()) {
+ res = Mono.create(monoSink -> asynchResponse(monoSink, value));
}
+
+ if (this.errorInject != null) {
+ res = res.flatMap(x -> monoError(this.errorInject, HttpStatus.BAD_GATEWAY));
+ }
+
+ return res;
}
- public static Mono<String> monoError(String responseBody, HttpStatus status) {
+ public static <T> Mono<T> monoError(String responseBody, HttpStatus status) {
byte[] responseBodyBytes = responseBody.getBytes(StandardCharsets.UTF_8);
WebClientResponseException a1Exception = new WebClientResponseException(status.value(),
status.getReasonPhrase(), null, responseBodyBytes, StandardCharsets.UTF_8, null);