diff options
Diffstat (limited to 'datafile-dmaap-client/src/main')
2 files changed, 39 insertions, 49 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java index ad9e6fe7..f32b22c4 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -2,31 +2,28 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service.consumer; import java.net.URI; -import java.net.URISyntaxException; +import java.util.function.Consumer; -import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -36,8 +33,6 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; private final String dmaapHostName; private final String dmaapProtocol; @@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient { private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; /** * Constructor of DmaapConsumerReactiveHttpClient. @@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient { * @return reactive response from DMaaP in string format */ public Mono<String> getDmaapConsumerResponse() { - try { - return webClient - .get() - .uri(getUri()) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) + return webClient.get().uri(getUri()).headers(getHeaders()).retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400"))) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.error("Unable to parse URI in message from xNF.", e); - return Mono.error(e); - } + } + + private Consumer<HttpHeaders> getHeaders() { + return httpHeaders -> { + httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType); + }; } private String createRequestPath() { @@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient { return this; } - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); + URI getUri() { + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(createRequestPath()).build(); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index fd3c0c84..4b8ce08f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -21,11 +21,9 @@ import com.google.gson.JsonParser; import java.io.File; import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import org.apache.http.HttpHeaders; -import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; @@ -38,6 +36,7 @@ import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -49,14 +48,15 @@ public class DmaapProducerReactiveHttpClient { private static final String X_ATT_DR_META = "X-ATT-DR-META"; private static final String LOCATION = "location"; + private static final String DEFAULT_FEED_ID = "1"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); private WebClient webClient; private final String dmaapHostName; private final Integer dmaapPortNumber; - private final String dmaapProtocol; private final String dmaapTopicName; + private final String dmaapProtocol; private final String dmaapContentType; /** @@ -67,9 +67,9 @@ public class DmaapProducerReactiveHttpClient { public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); - this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); } @@ -115,18 +115,13 @@ public class DmaapProducerReactiveHttpClient { private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { boolean result = true; - try { - post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); + post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - metaData.getAsJsonObject().remove(LOCATION); - post.header(X_ATT_DR_META, metaData.toString()); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + String location = metaData.getAsJsonObject().remove(LOCATION).getAsString(); + post.header(X_ATT_DR_META, metaData.toString()); - post.uri(getUri()); - } catch (Exception e) { - logger.error("Unable to post file to Data Router. " + model, e); - result = false; - } + post.uri(getUri(location)); return result; } @@ -138,9 +133,12 @@ public class DmaapProducerReactiveHttpClient { post.body(BodyInserters.fromResource(httpResource)); } - private URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(dmaapTopicName).build(); + private URI getUri(String location) { + String fileName = location.substring(location.indexOf("/"), location.length()); + String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; + URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName) + .port(dmaapPortNumber).path(path).build(); + return uri; } private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { |