diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-06-27 14:29:06 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-07 09:18:01 +0200 |
commit | 79984d737c71d3c92f3cd283eaf2b9b6157c2ce2 (patch) | |
tree | d467284687fa9db1ee15ead8a2f94594ec609a05 /prh-dmaap-client/src/main | |
parent | c8c9a242f7a1f8454e2cf94b0442128533569dc5 (diff) |
Refactor Optional in MonoResponse
Added JUnit tests for DmaapConsumer
Reactive Client.
Added property for spring to run
PRH as none-web application
Change-Id: I8b762389927151387da5b79e8b75b670600ee5e8
Issue-ID: DCAEGEN2-563
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-dmaap-client/src/main')
-rw-r--r-- | prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java index a99833dc..cb7d5af2 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -17,14 +17,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.service.consumer; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import java.net.URI; import java.net.URISyntaxException; -import java.util.Optional; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.slf4j.Logger; @@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient { private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String dmaapContentType; + private final String dmaapUserName; + private final String dmaapUserPassword; public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { this.dmaapHostName = consumerConfiguration.dmaapHostName(); @@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); - String dmaapContentType = consumerConfiguration.dmaapContentType(); + this.dmaapContentType = consumerConfiguration.dmaapContentType(); + this.dmaapUserName = consumerConfiguration.dmaapUserName(); + this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword(); + } + + public void initWebClient() { this.webClient = WebClient.builder() .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType) - .filter( - basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword())) + .filter(basicAuthentication(dmaapUserName, dmaapUserPassword)) .filter(logRequest()) .filter(logResponse()) .build(); } - public Mono<Optional<String>> getDmaaPConsumerResponse() { + public Mono<String> getDmaaPConsumerResponse() { try { return webClient .get() @@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient { ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) - .bodyToMono(String.class) - .map(Optional::of); + .bodyToMono(String.class); } catch (URISyntaxException e) { logger.warn("Exception while executing HTTP request: ", e); return Mono.error(e); } } - private URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); - } - private String createRequestPath() { return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; } - private ExchangeFilterFunction logResponse() { + void initWebClient(WebClient webClient) { + this.webClient = webClient; + } + + ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { logger.info("Response Status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } - private ExchangeFilterFunction logRequest() { + URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(createRequestPath()).build(); + } + + ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() |