diff options
Diffstat (limited to 'datafile-dmaap-client/src/main/java')
-rw-r--r-- | datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java | 56 |
1 files changed, 24 insertions, 32 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java index ad9e6fe7..f32b22c4 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -2,31 +2,28 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service.consumer; import java.net.URI; -import java.net.URISyntaxException; +import java.util.function.Consumer; -import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -36,8 +33,6 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; private final String dmaapHostName; private final String dmaapProtocol; @@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient { private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; /** * Constructor of DmaapConsumerReactiveHttpClient. @@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient { * @return reactive response from DMaaP in string format */ public Mono<String> getDmaapConsumerResponse() { - try { - return webClient - .get() - .uri(getUri()) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) + return webClient.get().uri(getUri()).headers(getHeaders()).retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400"))) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.error("Unable to parse URI in message from xNF.", e); - return Mono.error(e); - } + } + + private Consumer<HttpHeaders> getHeaders() { + return httpHeaders -> { + httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType); + }; } private String createRequestPath() { @@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient { return this; } - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); + URI getUri() { + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(createRequestPath()).build(); } } |