aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-06-09 10:47:17 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2022-06-09 13:24:33 +0200
commit779509036cdadb5735d336ed93a3bfac5a0be72a (patch)
tree5455d02091cd20ec2f7217d61cd5673b35a9be64
parent3f3a4d71e80ab134af52489e519f88be9786c860 (diff)
NONRTRIC PMS, Sporadic instability
Attempt to stablize the synch. Issue-ID: CCSDK-3683 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech> Change-Id: Ieda858e76082fd5224ac43f153e8967f871322d8
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java7
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java10
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java6
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java12
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java16
6 files changed, 42 insertions, 13 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
index d4c264da..108424d4 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.List;
import org.json.JSONArray;
-import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ class A1AdapterJsonHelper {
}
}
return Flux.fromIterable(arrayList);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Flux.error(ex);
}
@@ -88,7 +87,7 @@ class A1AdapterJsonHelper {
JSONObject outputJson = new JSONObject(response);
JSONObject responseParams = outputJson.getJSONObject(OUTPUT);
return Mono.just(responseParams);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
@@ -110,7 +109,7 @@ class A1AdapterJsonHelper {
JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
String schemaString = schemaObject.toString();
return Mono.just(schemaString);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
index 6216a4df..764afc85 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
@@ -285,17 +285,17 @@ public class CcsdkA1AdapterClient implements A1Client {
return restClient
.postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.getUserName(),
this.controllerConfig.getPassword()) //
- .flatMap(this::extractResponseBody);
+ .flatMap(resp -> extractResponseBody(resp, ricUrl));
}
- private Mono<String> extractResponse(JSONObject responseOutput) {
+ private Mono<String> extractResponse(JSONObject responseOutput, String ricUrl) {
AdapterOutput output = gson.fromJson(responseOutput.toString(), AdapterOutput.class);
String body = output.body == null ? "" : output.body;
if (HttpStatus.valueOf(output.httpStatus).is2xxSuccessful()) {
return Mono.just(body);
} else {
- logger.debug("Error response: {} {}", output.httpStatus, body);
+ logger.debug("Error response: {} {}, from: {}", output.httpStatus, body, ricUrl);
byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
HttpStatus httpStatus = HttpStatus.valueOf(output.httpStatus);
WebClientResponseException responseException = new WebClientResponseException(httpStatus.value(),
@@ -305,9 +305,9 @@ public class CcsdkA1AdapterClient implements A1Client {
}
}
- private Mono<String> extractResponseBody(String responseStr) {
+ private Mono<String> extractResponseBody(String responseStr, String ricUrl) {
return A1AdapterJsonHelper.getOutput(responseStr) //
- .flatMap(this::extractResponse);
+ .flatMap(responseOutput -> extractResponse(responseOutput, ricUrl));
}
private String controllerUrl(String rpcName) {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
index 62018424..500ddd2a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
@@ -66,6 +66,7 @@ import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClientException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
@@ -277,6 +278,9 @@ public class PolicyController {
if (throwable instanceof WebClientResponseException) {
WebClientResponseException e = (WebClientResponseException) throwable;
return ErrorResponse.createMono(e.getResponseBodyAsString(), e.getStatusCode());
+ } else if (throwable instanceof WebClientException) {
+ WebClientException e = (WebClientException) throwable;
+ return ErrorResponse.createMono(e.getMessage(), HttpStatus.BAD_GATEWAY);
} else if (throwable instanceof RejectionException) {
RejectionException e = (RejectionException) throwable;
return ErrorResponse.createMono(e.getMessage(), e.getStatus());
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index f90d462e..177778b8 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -109,6 +109,7 @@ public class RicSupervision {
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
+ .onErrorResume(t -> Flux.empty()) //
.flatMap(this::checkOneRic, CONCURRENCY);
}
@@ -153,8 +154,9 @@ public class RicSupervision {
}
private Mono<RicData> createRicData(Ric ric) {
- return Mono.just(ric) //
- .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
+ return this.a1ClientFactory.createA1Client(ric) //
+ .doOnError(t -> logger.debug("Could not create A1 client for ric: {}, reason: {}", ric.id(),
+ t.getMessage())) //
.map(a1Client -> new RicData(ric, a1Client));
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 2d282f33..b3afa7cd 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -40,6 +40,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
/**
* Synchronizes the content of a Near-RT RIC with the content in the repository.
@@ -96,12 +97,19 @@ public class RicSynchronizationTask {
.flatMapMany(client -> runSynchronization(ric, client)) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
- ric.setState(RicState.UNAVAILABLE); //
deletePoliciesIfNotRecreatable(t, ric);
}) //
.collectList() //
.flatMap(notUsed -> onSynchronizationComplete(ric)) //
- .onErrorResume(t -> Mono.just(ric));
+ .onErrorResume(t -> Mono.just(ric)) //
+ .doFinally(signal -> onFinally(signal, ric));
+ }
+
+ private void onFinally(SignalType signal, Ric ric) {
+ if (ric.getState().equals(RicState.SYNCHRONIZING)) {
+ logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal);
+ ric.setState(RicState.UNAVAILABLE); //
+ }
}
/**
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
index 0ea0a8cb..6386441c 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
@@ -47,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFact
import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -56,6 +57,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
@@ -164,6 +166,20 @@ class RicSynchronizationTaskTest {
}
@Test
+ void testConnectionError() {
+ setUpCreationOfA1Client();
+ simulateRicWithNoPolicyTypes();
+ policies.put(policy1);
+ WebClientRequestException exception =
+ new WebClientRequestException(new ServiceException("x"), null, null, null);
+ when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
+ RicSynchronizationTask synchronizerUnderTest = createTask();
+ ric1.setState(RicState.AVAILABLE);
+ synchronizerUnderTest.run(ric1);
+ await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
+ }
+
+ @Test
void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
rics.put(ric1);
ric1.setState(RicState.AVAILABLE);