aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java56
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java32
2 files changed, 39 insertions, 49 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index ad9e6fe7..f32b22c4 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -2,31 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.function.Consumer;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -36,8 +33,6 @@ import reactor.core.publisher.Mono;
*/
public class DmaapConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private WebClient webClient;
private final String dmaapHostName;
private final String dmaapProtocol;
@@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
/**
* Constructor of DmaapConsumerReactiveHttpClient.
@@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient {
* @return reactive response from DMaaP in string format
*/
public Mono<String> getDmaapConsumerResponse() {
- try {
- return webClient
- .get()
- .uri(getUri())
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
+ .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.error("Unable to parse URI in message from xNF.", e);
- return Mono.error(e);
- }
+ }
+
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
}
private String createRequestPath() {
@@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient {
return this;
}
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(createRequestPath()).build();
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index fd3c0c84..4b8ce08f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,11 +21,9 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import org.apache.http.HttpHeaders;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -38,6 +36,7 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -49,14 +48,15 @@ public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String LOCATION = "location";
+ private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private WebClient webClient;
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
private final String dmaapTopicName;
+ private final String dmaapProtocol;
private final String dmaapContentType;
/**
@@ -67,9 +67,9 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
@@ -115,18 +115,13 @@ public class DmaapProducerReactiveHttpClient {
private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
boolean result = true;
- try {
- post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- metaData.getAsJsonObject().remove(LOCATION);
- post.header(X_ATT_DR_META, metaData.toString());
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri());
- } catch (Exception e) {
- logger.error("Unable to post file to Data Router. " + model, e);
- result = false;
- }
+ post.uri(getUri(location));
return result;
}
@@ -138,9 +133,12 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(dmaapTopicName).build();
+ private URI getUri(String location) {
+ String fileName = location.substring(location.indexOf("/"), location.length());
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
+ .port(dmaapPortNumber).path(path).build();
+ return uri;
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {