diff options
Diffstat (limited to 'prh-dmaap-client/src/main')
-rw-r--r-- | prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java index a99833dc..cb7d5af2 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -17,14 +17,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.service.consumer; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import java.net.URI; import java.net.URISyntaxException; -import java.util.Optional; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.slf4j.Logger; @@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient { private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String dmaapContentType; + private final String dmaapUserName; + private final String dmaapUserPassword; public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { this.dmaapHostName = consumerConfiguration.dmaapHostName(); @@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); - String dmaapContentType = consumerConfiguration.dmaapContentType(); + this.dmaapContentType = consumerConfiguration.dmaapContentType(); + this.dmaapUserName = consumerConfiguration.dmaapUserName(); + this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword(); + } + + public void initWebClient() { this.webClient = WebClient.builder() .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType) - .filter( - basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword())) + .filter(basicAuthentication(dmaapUserName, dmaapUserPassword)) .filter(logRequest()) .filter(logResponse()) .build(); } - public Mono<Optional<String>> getDmaaPConsumerResponse() { + public Mono<String> getDmaaPConsumerResponse() { try { return webClient .get() @@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient { ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) - .bodyToMono(String.class) - .map(Optional::of); + .bodyToMono(String.class); } catch (URISyntaxException e) { logger.warn("Exception while executing HTTP request: ", e); return Mono.error(e); } } - private URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); - } - private String createRequestPath() { return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; } - private ExchangeFilterFunction logResponse() { + void initWebClient(WebClient webClient) { + this.webClient = webClient; + } + + ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { logger.info("Response Status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } - private ExchangeFilterFunction logRequest() { + URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(createRequestPath()).build(); + } + + ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() |