diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java | 94 |
1 files changed, 38 insertions, 56 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index 3461876d..959d85e2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * ONAP : ccsdk oran * ====================================================================== - * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. * ====================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,13 +35,14 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.lang.Nullable; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.transport.ProxyProvider; /** * Generic reactive REST client. @@ -54,17 +55,17 @@ public class AsyncRestClient { private static final AtomicInteger sequenceNumber = new AtomicInteger(); private final SslContext sslContext; private final HttpProxyConfig httpProxyConfig; + private final SecurityContext securityContext; - public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) { + public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig, + SecurityContext securityContext) { this.baseUrl = baseUrl; this.sslContext = sslContext; this.httpProxyConfig = httpProxyConfig; + this.securityContext = securityContext; } public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) { - Object traceTag = createTraceTag(); - logger.debug("{} POST uri = '{}{}'", traceTag, baseUrl, uri); - logger.trace("{} POST body: {}", traceTag, body); Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty(); RequestHeadersSpec<?> request = getWebClient() // @@ -72,7 +73,7 @@ public class AsyncRestClient { .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // .body(bodyProducer, String.class); - return retrieve(traceTag, request); + return retrieve(request); } public Mono<String> post(String uri, @Nullable String body) { @@ -81,41 +82,30 @@ public class AsyncRestClient { } public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) { - Object traceTag = createTraceTag(); - logger.debug("{} POST (auth) uri = '{}{}'", traceTag, baseUrl, uri); - logger.trace("{} POST body: {}", traceTag, body); - RequestHeadersSpec<?> request = getWebClient() // .post() // .uri(uri) // .headers(headers -> headers.setBasicAuth(username, password)) // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(body); - return retrieve(traceTag, request) // + return retrieve(request) // .map(this::toBody); } public Mono<ResponseEntity<String>> putForEntity(String uri, String body) { - Object traceTag = createTraceTag(); - logger.debug("{} PUT uri = '{}{}'", traceTag, baseUrl, uri); - logger.trace("{} PUT body: {}", traceTag, body); - RequestHeadersSpec<?> request = getWebClient() // .put() // .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(body); - return retrieve(traceTag, request); + return retrieve(request); } public Mono<ResponseEntity<String>> putForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} PUT uri = '{}{}'", traceTag, baseUrl, uri); - logger.trace("{} PUT body: <empty>", traceTag); RequestHeadersSpec<?> request = getWebClient() // .put() // .uri(uri); - return retrieve(traceTag, request); + return retrieve(request); } public Mono<String> put(String uri, String body) { @@ -124,12 +114,8 @@ public class AsyncRestClient { } public Mono<ResponseEntity<String>> getForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} GET uri = '{}{}'", traceTag, baseUrl, uri); - RequestHeadersSpec<?> request = getWebClient() // - .get() // - .uri(uri); - return retrieve(traceTag, request); + RequestHeadersSpec<?> request = getWebClient().get().uri(uri); + return retrieve(request); } public Mono<String> get(String uri) { @@ -138,12 +124,8 @@ public class AsyncRestClient { } public Mono<ResponseEntity<String>> deleteForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} DELETE uri = '{}{}'", traceTag, baseUrl, uri); - RequestHeadersSpec<?> request = getWebClient() // - .delete() // - .uri(uri); - return retrieve(traceTag, request); + RequestHeadersSpec<?> request = getWebClient().delete().uri(uri); + return retrieve(request); } public Mono<String> delete(String uri) { @@ -151,32 +133,18 @@ public class AsyncRestClient { .map(this::toBody); } - private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) { - final Class<String> clazz = String.class; + private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) { + if (securityContext.isConfigured()) { + request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken())); + } return request.retrieve() // - .toEntity(clazz) // - .doOnNext(entity -> logReceivedData(traceTag, entity)) // - .doOnError(throwable -> onHttpError(traceTag, throwable)); - } - - private void logReceivedData(Object traceTag, ResponseEntity<String> entity) { - logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType()); + .toEntity(String.class); } private static Object createTraceTag() { return sequenceNumber.incrementAndGet(); } - private void onHttpError(Object traceTag, Throwable t) { - if (t instanceof WebClientResponseException) { - WebClientResponseException exception = (WebClientResponseException) t; - logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), - exception.getResponseBodyAsString()); - } else { - logger.debug("{} HTTP error {}", traceTag, t.getMessage()); - } - } - private String toBody(ResponseEntity<String> entity) { if (entity.getBody() == null) { return ""; @@ -203,22 +171,36 @@ public class AsyncRestClient { } if (isHttpProxyConfigured()) { - httpClient = httpClient.proxy(proxy -> proxy.type(httpProxyConfig.httpProxyType()) // - .host(httpProxyConfig.httpProxyHost()) // - .port(httpProxyConfig.httpProxyPort())); + httpClient = httpClient.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP) + .host(httpProxyConfig.httpProxyHost()).port(httpProxyConfig.httpProxyPort())); } return httpClient; } - private WebClient buildWebClient(String baseUrl) { + public WebClient buildWebClient(String baseUrl) { + Object traceTag = createTraceTag(); + final HttpClient httpClient = buildHttpClient(); ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() // .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) // .build(); + + ExchangeFilterFunction reqLogger = ExchangeFilterFunction.ofRequestProcessor(req -> { + logger.debug("{} {} uri = '{}''", traceTag, req.method(), req.url()); + return Mono.just(req); + }); + + ExchangeFilterFunction respLogger = ExchangeFilterFunction.ofResponseProcessor(resp -> { + logger.debug("{} resp: {}", traceTag, resp.statusCode()); + return Mono.just(resp); + }); + return WebClient.builder() // .clientConnector(new ReactorClientHttpConnector(httpClient)) // .baseUrl(baseUrl) // .exchangeStrategies(exchangeStrategies) // + .filter(reqLogger) // + .filter(respLogger) // .build(); } |