diff options
Diffstat (limited to 'datafile-dmaap-client')
3 files changed, 36 insertions, 48 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java index f32b22c4..c4bf1611 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -70,9 +70,7 @@ public class DmaapConsumerReactiveHttpClient { } private Consumer<HttpHeaders> getHeaders() { - return httpHeaders -> { - httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType); - }; + return httpHeaders -> httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType); } private String createRequestPath() { diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 4b8ce08f..36050ff7 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -21,7 +21,6 @@ import com.google.gson.JsonParser; import java.io.File; import java.net.URI; -import java.util.List; import org.apache.http.HttpHeaders; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; @@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -47,7 +47,8 @@ import reactor.core.publisher.Mono; public class DmaapProducerReactiveHttpClient { private static final String X_ATT_DR_META = "X-ATT-DR-META"; - private static final String LOCATION = "location"; + private static final String NAME_JSON_TAG = "name"; + private static final String LOCATION_JSON_TAG = "location"; private static final String DEFAULT_FEED_ID = "1"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient { /** * Function for calling DMaaP HTTP producer - post request to DMaaP. * - * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @param consumerDmaapModel - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) { - consumerDmaapModelMono.subscribe(models -> postFilesAndData(models)); - return Mono.just(HttpStatus.OK.toString()); - } + public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); - public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } + RequestBodyUriSpec post = webClient.post(); - private void postFilesAndData(List<ConsumerDmaapModel> models) { - for (ConsumerDmaapModel consumerDmaapModel : models) { - postFileAndData(consumerDmaapModel); - } - } + prepareHead(consumerDmaapModel, post); - private void postFileAndData(ConsumerDmaapModel model) { - RequestBodyUriSpec post = webClient.post(); + prepareBody(consumerDmaapModel, post); - boolean headPrepared = prepareHead(model, post); + ResponseSpec responseSpec = post.retrieve(); + responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + Flux<String> response = responseSpec.bodyToFlux(String.class); - if (headPrepared) { - prepareBody(model, post); + logger.trace("Exiting getDmaapProducerResponse with {}", response); + return response; + } - ResponseSpec responseSpec = post.retrieve(); - responseSpec.onStatus(HttpStatus::is4xxClientError, - clientResponse -> handlePostErrors(model, clientResponse)); - responseSpec.onStatus(HttpStatus::is5xxServerError, - clientResponse -> handlePostErrors(model, clientResponse)); - String bodyToMono = responseSpec.bodyToMono(String.class).block(); - logger.debug("File info sent to DR with response: " + bodyToMono); - } + public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; } - private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { - boolean result = true; + private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String location = metaData.getAsJsonObject().remove(LOCATION).getAsString(); + String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); post.header(X_ATT_DR_META, metaData.toString()); - post.uri(getUri(location)); - - return result; + post.uri(getUri(name)); } private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { @@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient { post.body(BodyInserters.fromResource(httpResource)); } - private URI getUri(String location) { - String fileName = location.substring(location.indexOf("/"), location.length()); - String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; - URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName) - .port(dmaapPortNumber).path(path).build(); - return uri; + private URI getUri(String fileName) { + String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(path).build(); } private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index c0dbf31b..5f4c1a58 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -42,7 +42,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.util.DefaultUriBuilderFactory; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -52,6 +53,7 @@ class DmaapProducerReactiveHttpClientTest { private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String LOCATION_JSON_TAG = "location"; + private static final String NAME_JSON_TAG = "name"; private static final String X_ATT_DR_META = "X-ATT-DR-META"; private static final String HOST = "54.45.33.2"; @@ -98,11 +100,13 @@ class DmaapProducerReactiveHttpClientTest { List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>(); consumerDmaapModelList.add(consumerDmaapModel); - dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList)); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) + .expectNext("200").verifyComplete(); verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); + metaData.getAsJsonObject().remove(NAME_JSON_TAG); verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString()); URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT) .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build(); @@ -116,7 +120,7 @@ class DmaapProducerReactiveHttpClientTest { when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock); when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock); - Mono<String> expectedResult = Mono.just("200"); - when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult); + Flux<String> expectedResult = Flux.just("200"); + when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult); } } |