diff options
Diffstat (limited to 'datafile-dmaap-client/src/main/java')
-rw-r--r-- | datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java | 68 |
1 files changed, 27 insertions, 41 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 4b8ce08f..36050ff7 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -21,7 +21,6 @@ import com.google.gson.JsonParser; import java.io.File; import java.net.URI; -import java.util.List; import org.apache.http.HttpHeaders; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; @@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -47,7 +47,8 @@ import reactor.core.publisher.Mono; public class DmaapProducerReactiveHttpClient { private static final String X_ATT_DR_META = "X-ATT-DR-META"; - private static final String LOCATION = "location"; + private static final String NAME_JSON_TAG = "name"; + private static final String LOCATION_JSON_TAG = "location"; private static final String DEFAULT_FEED_ID = "1"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient { /** * Function for calling DMaaP HTTP producer - post request to DMaaP. * - * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @param consumerDmaapModel - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) { - consumerDmaapModelMono.subscribe(models -> postFilesAndData(models)); - return Mono.just(HttpStatus.OK.toString()); - } + public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); - public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } + RequestBodyUriSpec post = webClient.post(); - private void postFilesAndData(List<ConsumerDmaapModel> models) { - for (ConsumerDmaapModel consumerDmaapModel : models) { - postFileAndData(consumerDmaapModel); - } - } + prepareHead(consumerDmaapModel, post); - private void postFileAndData(ConsumerDmaapModel model) { - RequestBodyUriSpec post = webClient.post(); + prepareBody(consumerDmaapModel, post); - boolean headPrepared = prepareHead(model, post); + ResponseSpec responseSpec = post.retrieve(); + responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + Flux<String> response = responseSpec.bodyToFlux(String.class); - if (headPrepared) { - prepareBody(model, post); + logger.trace("Exiting getDmaapProducerResponse with {}", response); + return response; + } - ResponseSpec responseSpec = post.retrieve(); - responseSpec.onStatus(HttpStatus::is4xxClientError, - clientResponse -> handlePostErrors(model, clientResponse)); - responseSpec.onStatus(HttpStatus::is5xxServerError, - clientResponse -> handlePostErrors(model, clientResponse)); - String bodyToMono = responseSpec.bodyToMono(String.class).block(); - logger.debug("File info sent to DR with response: " + bodyToMono); - } + public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; } - private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { - boolean result = true; + private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String location = metaData.getAsJsonObject().remove(LOCATION).getAsString(); + String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); post.header(X_ATT_DR_META, metaData.toString()); - post.uri(getUri(location)); - - return result; + post.uri(getUri(name)); } private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { @@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient { post.body(BodyInserters.fromResource(httpResource)); } - private URI getUri(String location) { - String fileName = location.substring(location.indexOf("/"), location.length()); - String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; - URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName) - .port(dmaapPortNumber).path(path).build(); - return uri; + private URI getUri(String fileName) { + String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(path).build(); } private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { |