summaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main/java
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-09-19 08:49:28 +0200
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-09-21 16:46:38 +0200
commit90b9931873e13247e937a6d4b5206c65033b306e (patch)
tree23e667e9d68958edc49a94bb5a02778a6cc45713 /datafile-dmaap-client/src/main/java
parent3468d474187ef01546bdf1180d11453a4f924d31 (diff)
Use correct Content Type from DMaaP
Add the Content Type to the header the DMaaP consumer uses to get the fileReady event from DMAaaP to prevent formatting problems. Change-Id: Iedf38b7542e5709a78f383d31c75e7b95aa56cfe Issue-ID: DCAEGEN2-825 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-dmaap-client/src/main/java')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java56
1 files changed, 24 insertions, 32 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index ad9e6fe7..f32b22c4 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -2,31 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.function.Consumer;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -36,8 +33,6 @@ import reactor.core.publisher.Mono;
*/
public class DmaapConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private WebClient webClient;
private final String dmaapHostName;
private final String dmaapProtocol;
@@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
/**
* Constructor of DmaapConsumerReactiveHttpClient.
@@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient {
* @return reactive response from DMaaP in string format
*/
public Mono<String> getDmaapConsumerResponse() {
- try {
- return webClient
- .get()
- .uri(getUri())
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
+ .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.error("Unable to parse URI in message from xNF.", e);
- return Mono.error(e);
- }
+ }
+
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
}
private String createRequestPath() {
@@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient {
return this;
}
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(createRequestPath()).build();
}
}