aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src/main/java')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java32
1 files changed, 15 insertions, 17 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index fd3c0c84..4b8ce08f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,11 +21,9 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import org.apache.http.HttpHeaders;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -38,6 +36,7 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -49,14 +48,15 @@ public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String LOCATION = "location";
+ private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private WebClient webClient;
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
private final String dmaapTopicName;
+ private final String dmaapProtocol;
private final String dmaapContentType;
/**
@@ -67,9 +67,9 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
@@ -115,18 +115,13 @@ public class DmaapProducerReactiveHttpClient {
private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
boolean result = true;
- try {
- post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- metaData.getAsJsonObject().remove(LOCATION);
- post.header(X_ATT_DR_META, metaData.toString());
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri());
- } catch (Exception e) {
- logger.error("Unable to post file to Data Router. " + model, e);
- result = false;
- }
+ post.uri(getUri(location));
return result;
}
@@ -138,9 +133,12 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(dmaapTopicName).build();
+ private URI getUri(String location) {
+ String fileName = location.substring(location.indexOf("/"), location.length());
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
+ .port(dmaapPortNumber).path(path).build();
+ return uri;
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {