diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-06-26 15:15:03 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-07 08:21:02 +0200 |
commit | 8b1502fb0f1af5d00ec26e712e57b792fbd16bd8 (patch) | |
tree | 7cc80c278f17710863e6d865df77c5edfa6d4fbc /prh-dmaap-client/src/main | |
parent | e17c2d89d0470501fa60ed487726b0bbf3305f8c (diff) |
Added dmaapReactiveConsumer
*Tests have not been ready yet
Change-Id: I2e1d9c4218f91ae2f066b28acdbaa1870d7d27e7
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-dmaap-client/src/main')
2 files changed, 111 insertions, 1 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java new file mode 100644 index 00000000..859784a8 --- /dev/null +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.service.consumer; + +import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18 + */ +public class DmaapConsumerReactiveHttpClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final String dmaapProtocol; + private final Integer dmaapPortNumber; + private final String dmaapTopicName; + private final String consumerGroup; + private final String consumerId; + + public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { + this.dmaapHostName = consumerConfiguration.dmaapHostName(); + this.dmaapProtocol = consumerConfiguration.dmaapProtocol(); + this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber(); + this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); + this.consumerGroup = consumerConfiguration.consumerGroup(); + this.consumerId = consumerConfiguration.consumerId(); + String dmaapContentType = consumerConfiguration.dmaapContentType(); + this.webClient = WebClient.builder() + .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType) + .filter( + basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword())) + .filter(logRequest()) + .filter(logResponse()) + .build(); + } + + public Mono<String> getDmaaPConsumerResposne() { + try { + return webClient + .get() + .uri(getUri()) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> + Mono.error(new Exception("HTTP 400")) + ) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) + .bodyToMono(String.class); + } catch (URISyntaxException e) { + logger.warn("Exception while executing HTTP request: ", e); + return Mono.error(e); + } + } + + private URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(createRequestPath()).build(); + } + + private String createRequestPath() { + return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; + } + + private ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response Status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } +} diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java index a80194d5..0f8fe28c 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java @@ -1,4 +1,4 @@ -/*- +/* * ============LICENSE_START======================================================= * PNF-REGISTRATION-HANDLER * ================================================================================ |