From cf498072469966eae4bbd9a3a810a3b7a65f98ae Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 11 Nov 2021 09:40:14 +0100 Subject: A1 Policy Management Minor simplifications. Issue-ID: CCSDK-3495 Signed-off-by: PatrikBuhr Change-Id: Id90cf9dfeb2b56561baf19391ddc858fc762888c --- .../clients/A1AdapterJsonHelper.java | 7 +- .../clients/AsyncRestClient.java | 96 ++++++++++------------ .../controllers/v1/PolicyController.java | 6 +- .../controllers/v2/PolicyController.java | 2 +- .../tasks/RefreshConfigTask.java | 12 ++- .../tasks/RicSupervision.java | 2 +- .../tasks/RicSynchronizationTask.java | 13 ++- 7 files changed, 63 insertions(+), 75 deletions(-) (limited to 'a1-policy-management/src/main') diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java index c72f196a..d4c264da 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java @@ -96,12 +96,11 @@ class A1AdapterJsonHelper { public static Mono getValueFromResponse(String response, String key) { return getOutput(response) // - .flatMap(responseParams -> { + .map(responseParams -> { if (!responseParams.has(key)) { - return Mono.just(""); + return ""; } - String value = responseParams.get(key).toString(); - return Mono.just(value); + return responseParams.get(key).toString(); }); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index c3be9b4d..4f2770bf 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -66,96 +66,89 @@ public class AsyncRestClient { logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .body(bodyProducer, String.class); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); } public Mono post(String uri, @Nullable String body) { return postForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono postWithAuthHeader(String uri, String body, String username, String password) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request) // - .flatMap(this::toBody); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .map(this::toBody); } public Mono> putForEntity(String uri, String body) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); } public Mono> putForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: ", traceTag); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri); + return retrieve(traceTag, request); } public Mono put(String uri, String body) { return putForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> getForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.get().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient() // + .get() // + .uri(uri); + return retrieve(traceTag, request); } public Mono get(String uri) { return getForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> deleteForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.delete().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient() // + .delete() // + .uri(uri); + return retrieve(traceTag, request); } public Mono delete(String uri) { return deleteForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { @@ -184,11 +177,11 @@ public class AsyncRestClient { } } - private Mono toBody(ResponseEntity entity) { + private String toBody(ResponseEntity entity) { if (entity.getBody() == null) { - return Mono.just(""); + return ""; } else { - return Mono.just(entity.getBody()); + return entity.getBody(); } } @@ -229,11 +222,10 @@ public class AsyncRestClient { .build(); } - private Mono getWebClient() { + private WebClient getWebClient() { if (this.webClient == null) { this.webClient = buildWebClient(baseUrl); } - return Mono.just(buildWebClient(baseUrl)); + return this.webClient; } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java index 3d2b55a2..bead6d11 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java @@ -205,7 +205,7 @@ public class PolicyController { .flatMap(client -> client.deletePolicy(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(notUsed -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))) + .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) // .onErrorResume(this::handleException); } @@ -273,7 +273,7 @@ public class PolicyController { .doOnNext(notUsed -> policies.put(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(trowable -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // + .map(notUsed -> new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK)) // .onErrorResume(this::handleException); } @@ -406,7 +406,7 @@ public class PolicyController { return a1ClientFactory.createA1Client(policy.getRic()) // .flatMap(client -> client.getPolicyStatus(policy)) // - .flatMap(status -> Mono.just(new ResponseEntity<>(status, HttpStatus.OK))) + .map(status -> new ResponseEntity<>(status, HttpStatus.OK)) // .onErrorResume(this::handleException); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index 036958d3..134d6d7a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -207,7 +207,7 @@ public class PolicyController { .flatMap(client -> client.deletePolicy(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(notUsed -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))) + .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) // .onErrorResume(this::handleException); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 6177ee1b..e488af5f 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -226,7 +226,7 @@ public class RefreshConfigTask { return this.a1ClientFactory.createA1Client(ric) // .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) // .collectList() // - .flatMap(list -> Mono.just(ric)) // + .map(list -> ric) // .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) // .doOnError(t -> { logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage()); @@ -243,7 +243,7 @@ public class RefreshConfigTask { logger.debug("RIC added {}", ricId); return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) // - .flatMap(this::addRic) // + .doOnNext(this::addRic) // .flatMap(this::notifyServicesRicAvailable) // .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { @@ -256,7 +256,7 @@ public class RefreshConfigTask { Ric ric = this.rics.get(ricId); if (ric == null) { logger.error("An non existing RIC config is changed, should not happen (just for robustness)"); - addRic(new Ric(updatedInfo.getRicConfig())).block(); + addRic(new Ric(updatedInfo.getRicConfig())); } else { ric.setRicConfig(updatedInfo.getRicConfig()); } @@ -265,14 +265,12 @@ public class RefreshConfigTask { } } - Mono addRic(Ric ric) { + void addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { this.policies.restoreFromDatabase(ric, this.policyTypes); } logger.debug("Added RIC: {}", ric.id()); - - return Mono.just(ric); } private Mono notifyServicesRicAvailable(Ric ric) { @@ -280,7 +278,7 @@ public class RefreshConfigTask { ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); return callbacks.notifyServicesRicAvailable(ric, services) // .collectList() // - .flatMap(list -> Mono.just(ric)); + .map(list -> ric); } else { return Mono.just(ric); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 87689012..b4c6595d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -152,7 +152,7 @@ public class RicSupervision { private Mono createRicData(Ric ric) { return Mono.just(ric) // .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // - .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client))); + .map(a1Client -> new RicData(ric, a1Client)); } private Mono checkRicState(RicData ric) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 0ccccb73..0552df6a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -97,13 +97,12 @@ public class RicSynchronizationTask { } public Mono synchronizeRic(Ric ric) { - return Mono.just(ric) // - .flatMap(notUsed -> setRicState(ric)) // + return setRicState(ric) // .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // .flatMapMany(client -> runSynchronization(ric, client)) // .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) // .collectList() // - .flatMap(notUsed -> Mono.just(ric)) // + .map(notUsed -> ric) // .doOnError(t -> { // logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // ric.setState(RicState.UNAVAILABLE); // @@ -152,7 +151,7 @@ public class RicSynchronizationTask { ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); return callbacks.notifyServicesRicAvailable(ric, services) // .collectList() // - .flatMap(list -> Mono.just(ric)); + .map(list -> ric); } private Flux deleteAllPolicyInstances(Ric ric, Throwable t) { @@ -173,13 +172,13 @@ public class RicSynchronizationTask { return Mono.just(policyTypes.get(policyTypeId)); } return a1Client.getPolicyTypeSchema(policyTypeId) // - .flatMap(schema -> createPolicyType(policyTypeId, schema)); + .map(schema -> createPolicyType(policyTypeId, schema)); } - private Mono createPolicyType(String policyTypeId, String schema) { + private PolicyType createPolicyType(String policyTypeId, String schema) { PolicyType pt = PolicyType.builder().id(policyTypeId).schema(schema).build(); policyTypes.put(pt); - return Mono.just(pt); + return pt; } private void deleteAllPoliciesInRepository(Ric ric) { -- cgit 1.2.3-korg