aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java7
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java96
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java6
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java12
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java13
7 files changed, 63 insertions, 75 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
index c72f196a..d4c264da 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
@@ -96,12 +96,11 @@ class A1AdapterJsonHelper {
public static Mono<String> getValueFromResponse(String response, String key) {
return getOutput(response) //
- .flatMap(responseParams -> {
+ .map(responseParams -> {
if (!responseParams.has(key)) {
- return Mono.just("");
+ return "";
}
- String value = responseParams.get(key).toString();
- return Mono.just(value);
+ return responseParams.get(key).toString();
});
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
index c3be9b4d..4f2770bf 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
@@ -66,96 +66,89 @@ public class AsyncRestClient {
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .get() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .delete() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -184,11 +177,11 @@ public class AsyncRestClient {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
@@ -229,11 +222,10 @@ public class AsyncRestClient {
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
-
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java
index 3d2b55a2..bead6d11 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java
@@ -205,7 +205,7 @@ public class PolicyController {
.flatMap(client -> client.deletePolicy(policy)) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
.doOnError(notUsed -> ric.getLock().unlockBlocking()) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)))
+ .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) //
.onErrorResume(this::handleException);
}
@@ -273,7 +273,7 @@ public class PolicyController {
.doOnNext(notUsed -> policies.put(policy)) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
.doOnError(trowable -> ric.getLock().unlockBlocking()) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .map(notUsed -> new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(this::handleException);
}
@@ -406,7 +406,7 @@ public class PolicyController {
return a1ClientFactory.createA1Client(policy.getRic()) //
.flatMap(client -> client.getPolicyStatus(policy)) //
- .flatMap(status -> Mono.just(new ResponseEntity<>(status, HttpStatus.OK)))
+ .map(status -> new ResponseEntity<>(status, HttpStatus.OK)) //
.onErrorResume(this::handleException);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
index 036958d3..134d6d7a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
@@ -207,7 +207,7 @@ public class PolicyController {
.flatMap(client -> client.deletePolicy(policy)) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
.doOnError(notUsed -> ric.getLock().unlockBlocking()) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)))
+ .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) //
.onErrorResume(this::handleException);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 6177ee1b..e488af5f 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -226,7 +226,7 @@ public class RefreshConfigTask {
return this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
.collectList() //
- .flatMap(list -> Mono.just(ric)) //
+ .map(list -> ric) //
.doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
.doOnError(t -> {
logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
@@ -243,7 +243,7 @@ public class RefreshConfigTask {
logger.debug("RIC added {}", ricId);
return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
- .flatMap(this::addRic) //
+ .doOnNext(this::addRic) //
.flatMap(this::notifyServicesRicAvailable) //
.flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
@@ -256,7 +256,7 @@ public class RefreshConfigTask {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(new Ric(updatedInfo.getRicConfig())).block();
+ addRic(new Ric(updatedInfo.getRicConfig()));
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
@@ -265,14 +265,12 @@ public class RefreshConfigTask {
}
}
- Mono<Ric> addRic(Ric ric) {
+ void addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
logger.debug("Added RIC: {}", ric.id());
-
- return Mono.just(ric);
}
private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
@@ -280,7 +278,7 @@ public class RefreshConfigTask {
ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
return callbacks.notifyServicesRicAvailable(ric, services) //
.collectList() //
- .flatMap(list -> Mono.just(ric));
+ .map(list -> ric);
} else {
return Mono.just(ric);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index 87689012..b4c6595d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -152,7 +152,7 @@ public class RicSupervision {
private Mono<RicData> createRicData(Ric ric) {
return Mono.just(ric) //
.flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client)));
+ .map(a1Client -> new RicData(ric, a1Client));
}
private Mono<RicData> checkRicState(RicData ric) {
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 0ccccb73..0552df6a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -97,13 +97,12 @@ public class RicSynchronizationTask {
}
public Mono<Ric> synchronizeRic(Ric ric) {
- return Mono.just(ric) //
- .flatMap(notUsed -> setRicState(ric)) //
+ return setRicState(ric) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
.onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
.collectList() //
- .flatMap(notUsed -> Mono.just(ric)) //
+ .map(notUsed -> ric) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
ric.setState(RicState.UNAVAILABLE); //
@@ -152,7 +151,7 @@ public class RicSynchronizationTask {
ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
return callbacks.notifyServicesRicAvailable(ric, services) //
.collectList() //
- .flatMap(list -> Mono.just(ric));
+ .map(list -> ric);
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
@@ -173,13 +172,13 @@ public class RicSynchronizationTask {
return Mono.just(policyTypes.get(policyTypeId));
}
return a1Client.getPolicyTypeSchema(policyTypeId) //
- .flatMap(schema -> createPolicyType(policyTypeId, schema));
+ .map(schema -> createPolicyType(policyTypeId, schema));
}
- private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+ private PolicyType createPolicyType(String policyTypeId, String schema) {
PolicyType pt = PolicyType.builder().id(policyTypeId).schema(schema).build();
policyTypes.put(pt);
- return Mono.just(pt);
+ return pt;
}
private void deleteAllPoliciesInRepository(Ric ric) {