diff options
6 files changed, 42 insertions, 13 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java index d4c264da..108424d4 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.List; import org.json.JSONArray; -import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +62,7 @@ class A1AdapterJsonHelper { } } return Flux.fromIterable(arrayList); - } catch (JSONException ex) { // invalid json + } catch (Exception ex) { // invalid json logger.debug("Invalid json {}", ex.getMessage()); return Flux.error(ex); } @@ -88,7 +87,7 @@ class A1AdapterJsonHelper { JSONObject outputJson = new JSONObject(response); JSONObject responseParams = outputJson.getJSONObject(OUTPUT); return Mono.just(responseParams); - } catch (JSONException ex) { // invalid json + } catch (Exception ex) { // invalid json logger.debug("Invalid json {}", ex.getMessage()); return Mono.error(ex); } @@ -110,7 +109,7 @@ class A1AdapterJsonHelper { JSONObject schemaObject = jsonObject.getJSONObject("policySchema"); String schemaString = schemaObject.toString(); return Mono.just(schemaString); - } catch (JSONException ex) { // invalid json + } catch (Exception ex) { // invalid json logger.debug("Invalid json {}", ex.getMessage()); return Mono.error(ex); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java index 6216a4df..764afc85 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java @@ -285,17 +285,17 @@ public class CcsdkA1AdapterClient implements A1Client { return restClient .postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.getUserName(), this.controllerConfig.getPassword()) // - .flatMap(this::extractResponseBody); + .flatMap(resp -> extractResponseBody(resp, ricUrl)); } - private Mono<String> extractResponse(JSONObject responseOutput) { + private Mono<String> extractResponse(JSONObject responseOutput, String ricUrl) { AdapterOutput output = gson.fromJson(responseOutput.toString(), AdapterOutput.class); String body = output.body == null ? "" : output.body; if (HttpStatus.valueOf(output.httpStatus).is2xxSuccessful()) { return Mono.just(body); } else { - logger.debug("Error response: {} {}", output.httpStatus, body); + logger.debug("Error response: {} {}, from: {}", output.httpStatus, body, ricUrl); byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8); HttpStatus httpStatus = HttpStatus.valueOf(output.httpStatus); WebClientResponseException responseException = new WebClientResponseException(httpStatus.value(), @@ -305,9 +305,9 @@ public class CcsdkA1AdapterClient implements A1Client { } } - private Mono<String> extractResponseBody(String responseStr) { + private Mono<String> extractResponseBody(String responseStr, String ricUrl) { return A1AdapterJsonHelper.getOutput(responseStr) // - .flatMap(this::extractResponse); + .flatMap(responseOutput -> extractResponse(responseOutput, ricUrl)); } private String controllerUrl(String rpcName) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index 62018424..500ddd2a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -66,6 +66,7 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClientException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; @@ -277,6 +278,9 @@ public class PolicyController { if (throwable instanceof WebClientResponseException) { WebClientResponseException e = (WebClientResponseException) throwable; return ErrorResponse.createMono(e.getResponseBodyAsString(), e.getStatusCode()); + } else if (throwable instanceof WebClientException) { + WebClientException e = (WebClientException) throwable; + return ErrorResponse.createMono(e.getMessage(), HttpStatus.BAD_GATEWAY); } else if (throwable instanceof RejectionException) { RejectionException e = (RejectionException) throwable; return ErrorResponse.createMono(e.getMessage(), e.getStatus()); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index f90d462e..177778b8 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -109,6 +109,7 @@ public class RicSupervision { private Flux<RicData> createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // + .onErrorResume(t -> Flux.empty()) // .flatMap(this::checkOneRic, CONCURRENCY); } @@ -153,8 +154,9 @@ public class RicSupervision { } private Mono<RicData> createRicData(Ric ric) { - return Mono.just(ric) // - .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // + return this.a1ClientFactory.createA1Client(ric) // + .doOnError(t -> logger.debug("Could not create A1 client for ric: {}, reason: {}", ric.id(), + t.getMessage())) // .map(a1Client -> new RicData(ric, a1Client)); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 2d282f33..b3afa7cd 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -40,6 +40,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; /** * Synchronizes the content of a Near-RT RIC with the content in the repository. @@ -96,12 +97,19 @@ public class RicSynchronizationTask { .flatMapMany(client -> runSynchronization(ric, client)) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // - ric.setState(RicState.UNAVAILABLE); // deletePoliciesIfNotRecreatable(t, ric); }) // .collectList() // .flatMap(notUsed -> onSynchronizationComplete(ric)) // - .onErrorResume(t -> Mono.just(ric)); + .onErrorResume(t -> Mono.just(ric)) // + .doFinally(signal -> onFinally(signal, ric)); + } + + private void onFinally(SignalType signal, Ric ric) { + if (ric.getState().equals(RicState.SYNCHRONIZING)) { + logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal); + ric.setState(RicState.UNAVAILABLE); // + } } /** diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java index 0ea0a8cb..6386441c 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java @@ -47,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFact import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -56,6 +57,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; +import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; @@ -164,6 +166,20 @@ class RicSynchronizationTaskTest { } @Test + void testConnectionError() { + setUpCreationOfA1Client(); + simulateRicWithNoPolicyTypes(); + policies.put(policy1); + WebClientRequestException exception = + new WebClientRequestException(new ServiceException("x"), null, null, null); + when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception)); + RicSynchronizationTask synchronizerUnderTest = createTask(); + ric1.setState(RicState.AVAILABLE); + synchronizerUnderTest.run(ric1); + await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState())); + } + + @Test void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() { rics.put(ric1); ric1.setState(RicState.AVAILABLE); |