aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java68
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java12
2 files changed, 35 insertions, 45 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 4b8ce08f..36050ff7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,7 +21,6 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.util.List;
import org.apache.http.HttpHeaders;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -47,7 +47,8 @@ import reactor.core.publisher.Mono;
public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
- private static final String LOCATION = "location";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String LOCATION_JSON_TAG = "location";
private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient {
/**
* Function for calling DMaaP HTTP producer - post request to DMaaP.
*
- * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @param consumerDmaapModel - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
- consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
- return Mono.just(HttpStatus.OK.toString());
- }
+ public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
- public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
- this.webClient = webClient;
- return this;
- }
+ RequestBodyUriSpec post = webClient.post();
- private void postFilesAndData(List<ConsumerDmaapModel> models) {
- for (ConsumerDmaapModel consumerDmaapModel : models) {
- postFileAndData(consumerDmaapModel);
- }
- }
+ prepareHead(consumerDmaapModel, post);
- private void postFileAndData(ConsumerDmaapModel model) {
- RequestBodyUriSpec post = webClient.post();
+ prepareBody(consumerDmaapModel, post);
- boolean headPrepared = prepareHead(model, post);
+ ResponseSpec responseSpec = post.retrieve();
+ responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ Flux<String> response = responseSpec.bodyToFlux(String.class);
- if (headPrepared) {
- prepareBody(model, post);
+ logger.trace("Exiting getDmaapProducerResponse with {}", response);
+ return response;
+ }
- ResponseSpec responseSpec = post.retrieve();
- responseSpec.onStatus(HttpStatus::is4xxClientError,
- clientResponse -> handlePostErrors(model, clientResponse));
- responseSpec.onStatus(HttpStatus::is5xxServerError,
- clientResponse -> handlePostErrors(model, clientResponse));
- String bodyToMono = responseSpec.bodyToMono(String.class).block();
- logger.debug("File info sent to DR with response: " + bodyToMono);
- }
+ public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ return this;
}
- private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
- boolean result = true;
+ private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri(location));
-
- return result;
+ post.uri(getUri(name));
}
private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
@@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri(String location) {
- String fileName = location.substring(location.indexOf("/"), location.length());
- String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
- URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
- .port(dmaapPortNumber).path(path).build();
- return uri;
+ private URI getUri(String fileName) {
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(path).build();
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index c0dbf31b..5f4c1a58 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -42,7 +42,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.DefaultUriBuilderFactory;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -52,6 +53,7 @@ class DmaapProducerReactiveHttpClientTest {
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION_JSON_TAG = "location";
+ private static final String NAME_JSON_TAG = "name";
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String HOST = "54.45.33.2";
@@ -98,11 +100,13 @@ class DmaapProducerReactiveHttpClientTest {
List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
consumerDmaapModelList.add(consumerDmaapModel);
- dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList));
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ .expectNext("200").verifyComplete();
verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG);
verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT)
.path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build();
@@ -116,7 +120,7 @@ class DmaapProducerReactiveHttpClientTest {
when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock);
when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock);
- Mono<String> expectedResult = Mono.just("200");
- when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult);
+ Flux<String> expectedResult = Flux.just("200");
+ when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult);
}
}