aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main/java/org
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-09-26 16:03:45 +0200
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-09-27 14:30:56 +0200
commit263d1a481bd5378e59b804425bc8a9bdc9bebb9a (patch)
tree4825c45fbccf1f99e3765417118c8a74bd9fe990 /datafile-dmaap-client/src/main/java/org
parentd1e9b1e4004faf059c2505ef9ab561bf19472b98 (diff)
Fix delivery to DataRouter
The messages to the DataRouter was not actually sent. Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f Issue-ID: DCAEGEN2-841 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-dmaap-client/src/main/java/org')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java68
1 files changed, 27 insertions, 41 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 4b8ce08f..36050ff7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,7 +21,6 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.util.List;
import org.apache.http.HttpHeaders;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -47,7 +47,8 @@ import reactor.core.publisher.Mono;
public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
- private static final String LOCATION = "location";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String LOCATION_JSON_TAG = "location";
private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient {
/**
* Function for calling DMaaP HTTP producer - post request to DMaaP.
*
- * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @param consumerDmaapModel - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
- consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
- return Mono.just(HttpStatus.OK.toString());
- }
+ public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
- public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
- this.webClient = webClient;
- return this;
- }
+ RequestBodyUriSpec post = webClient.post();
- private void postFilesAndData(List<ConsumerDmaapModel> models) {
- for (ConsumerDmaapModel consumerDmaapModel : models) {
- postFileAndData(consumerDmaapModel);
- }
- }
+ prepareHead(consumerDmaapModel, post);
- private void postFileAndData(ConsumerDmaapModel model) {
- RequestBodyUriSpec post = webClient.post();
+ prepareBody(consumerDmaapModel, post);
- boolean headPrepared = prepareHead(model, post);
+ ResponseSpec responseSpec = post.retrieve();
+ responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ Flux<String> response = responseSpec.bodyToFlux(String.class);
- if (headPrepared) {
- prepareBody(model, post);
+ logger.trace("Exiting getDmaapProducerResponse with {}", response);
+ return response;
+ }
- ResponseSpec responseSpec = post.retrieve();
- responseSpec.onStatus(HttpStatus::is4xxClientError,
- clientResponse -> handlePostErrors(model, clientResponse));
- responseSpec.onStatus(HttpStatus::is5xxServerError,
- clientResponse -> handlePostErrors(model, clientResponse));
- String bodyToMono = responseSpec.bodyToMono(String.class).block();
- logger.debug("File info sent to DR with response: " + bodyToMono);
- }
+ public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ return this;
}
- private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
- boolean result = true;
+ private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri(location));
-
- return result;
+ post.uri(getUri(name));
}
private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
@@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri(String location) {
- String fileName = location.substring(location.indexOf("/"), location.length());
- String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
- URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
- .port(dmaapPortNumber).path(path).build();
- return uri;
+ private URI getUri(String fileName) {
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(path).build();
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {