diff options
Diffstat (limited to 'a1-policy-management/src/main/java')
7 files changed, 63 insertions, 75 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java index c72f196a..d4c264da 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java @@ -96,12 +96,11 @@ class A1AdapterJsonHelper { public static Mono<String> getValueFromResponse(String response, String key) { return getOutput(response) // - .flatMap(responseParams -> { + .map(responseParams -> { if (!responseParams.has(key)) { - return Mono.just(""); + return ""; } - String value = responseParams.get(key).toString(); - return Mono.just(value); + return responseParams.get(key).toString(); }); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index c3be9b4d..4f2770bf 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -66,96 +66,89 @@ public class AsyncRestClient { logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty(); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.post() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .body(bodyProducer, String.class); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec<?> request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); } public Mono<String> post(String uri, @Nullable String body) { return postForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.post() // - .uri(uri) // - .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request) // - .flatMap(this::toBody); - }); + + RequestHeadersSpec<?> request = getWebClient() // + .post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .map(this::toBody); } public Mono<ResponseEntity<String>> putForEntity(String uri, String body) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.put() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec<?> request = getWebClient() // + .put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); } public Mono<ResponseEntity<String>> putForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: <empty>", traceTag); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.put() // - .uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec<?> request = getWebClient() // + .put() // + .uri(uri); + return retrieve(traceTag, request); } public Mono<String> put(String uri, String body) { return putForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono<ResponseEntity<String>> getForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.get().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec<?> request = getWebClient() // + .get() // + .uri(uri); + return retrieve(traceTag, request); } public Mono<String> get(String uri) { return getForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono<ResponseEntity<String>> deleteForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec<?> request = client.delete().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec<?> request = getWebClient() // + .delete() // + .uri(uri); + return retrieve(traceTag, request); } public Mono<String> delete(String uri) { return deleteForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) { @@ -184,11 +177,11 @@ public class AsyncRestClient { } } - private Mono<String> toBody(ResponseEntity<String> entity) { + private String toBody(ResponseEntity<String> entity) { if (entity.getBody() == null) { - return Mono.just(""); + return ""; } else { - return Mono.just(entity.getBody()); + return entity.getBody(); } } @@ -229,11 +222,10 @@ public class AsyncRestClient { .build(); } - private Mono<WebClient> getWebClient() { + private WebClient getWebClient() { if (this.webClient == null) { this.webClient = buildWebClient(baseUrl); } - return Mono.just(buildWebClient(baseUrl)); + return this.webClient; } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java index 3d2b55a2..bead6d11 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java @@ -205,7 +205,7 @@ public class PolicyController { .flatMap(client -> client.deletePolicy(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(notUsed -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))) + .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) // .onErrorResume(this::handleException); } @@ -273,7 +273,7 @@ public class PolicyController { .doOnNext(notUsed -> policies.put(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(trowable -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // + .map(notUsed -> new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK)) // .onErrorResume(this::handleException); } @@ -406,7 +406,7 @@ public class PolicyController { return a1ClientFactory.createA1Client(policy.getRic()) // .flatMap(client -> client.getPolicyStatus(policy)) // - .flatMap(status -> Mono.just(new ResponseEntity<>(status, HttpStatus.OK))) + .map(status -> new ResponseEntity<>(status, HttpStatus.OK)) // .onErrorResume(this::handleException); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index 036958d3..134d6d7a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -207,7 +207,7 @@ public class PolicyController { .flatMap(client -> client.deletePolicy(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(notUsed -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))) + .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) // .onErrorResume(this::handleException); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 6177ee1b..e488af5f 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -226,7 +226,7 @@ public class RefreshConfigTask { return this.a1ClientFactory.createA1Client(ric) // .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // .collectList() // - .flatMap(list -> Mono.just(ric)) // + .map(list -> ric) // .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) // .doOnError(t -> { logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage()); @@ -243,7 +243,7 @@ public class RefreshConfigTask { logger.debug("RIC added {}", ricId); return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) // - .flatMap(this::addRic) // + .doOnNext(this::addRic) // .flatMap(this::notifyServicesRicAvailable) // .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { @@ -256,7 +256,7 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(new Ric(updatedInfo.getRicConfig())).block(); + addRic(new Ric(updatedInfo.getRicConfig())); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } @@ -265,14 +265,12 @@ public class RefreshConfigTask { } } - Mono<Ric> addRic(Ric ric) { + void addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { this.policies.restoreFromDatabase(ric, this.policyTypes); } logger.debug("Added RIC: {}", ric.id()); - - return Mono.just(ric); } private Mono<Ric> notifyServicesRicAvailable(Ric ric) { @@ -280,7 +278,7 @@ public class RefreshConfigTask { ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); return callbacks.notifyServicesRicAvailable(ric, services) // .collectList() // - .flatMap(list -> Mono.just(ric)); + .map(list -> ric); } else { return Mono.just(ric); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 87689012..b4c6595d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -152,7 +152,7 @@ public class RicSupervision { private Mono<RicData> createRicData(Ric ric) { return Mono.just(ric) // .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // - .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client))); + .map(a1Client -> new RicData(ric, a1Client)); } private Mono<RicData> checkRicState(RicData ric) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 0ccccb73..0552df6a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -97,13 +97,12 @@ public class RicSynchronizationTask { } public Mono<Ric> synchronizeRic(Ric ric) { - return Mono.just(ric) // - .flatMap(notUsed -> setRicState(ric)) // + return setRicState(ric) // .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // .flatMapMany(client -> runSynchronization(ric, client)) // .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // .collectList() // - .flatMap(notUsed -> Mono.just(ric)) // + .map(notUsed -> ric) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // ric.setState(RicState.UNAVAILABLE); // @@ -152,7 +151,7 @@ public class RicSynchronizationTask { ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); return callbacks.notifyServicesRicAvailable(ric, services) // .collectList() // - .flatMap(list -> Mono.just(ric)); + .map(list -> ric); } private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) { @@ -173,13 +172,13 @@ public class RicSynchronizationTask { return Mono.just(policyTypes.get(policyTypeId)); } return a1Client.getPolicyTypeSchema(policyTypeId) // - .flatMap(schema -> createPolicyType(policyTypeId, schema)); + .map(schema -> createPolicyType(policyTypeId, schema)); } - private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) { + private PolicyType createPolicyType(String policyTypeId, String schema) { PolicyType pt = PolicyType.builder().id(policyTypeId).schema(schema).build(); policyTypes.put(pt); - return Mono.just(pt); + return pt; } private void deleteAllPoliciesInRepository(Ric ric) { |