summaryrefslogtreecommitdiffstats
path: root/prh-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'prh-dmaap-client/src/main')
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java7
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java12
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java57
3 files changed, 38 insertions, 38 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
index 8ce81757..4327dfbf 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.prh.service;
-import java.util.HashMap;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserName;
private String dmaaPUserPassword;
- private String dmaaPContentType;
/**
* Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
@@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient {
public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
- this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
-
return this;
}
@@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient {
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
.filter(logRequest())
.filter(logResponse())
.build();
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
index ac13dd61..f9a66378 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -29,9 +29,8 @@ import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -41,13 +40,13 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final String dmaapHostName;
private final String dmaapProtocol;
private final Integer dmaapPortNumber;
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
private WebClient webClient;
/**
@@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient {
.uri(getUri())
.header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
.header(X_INVOCATION_ID, UUID.randomUUID().toString())
+ .header(HttpHeaders.CONTENT_TYPE, contentType)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
return Mono.error(e);
}
}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index d049d380..5c72b38c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service.producer;
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
@@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -44,11 +47,13 @@ import reactor.core.publisher.Mono;
public class DMaaPProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
private final String dmaapTopicName;
- private WebClient webClient;
+ private final String dmaapContentType;
+ private RestTemplate restTemplate;
/**
* Constructor DMaaPProducerReactiveHttpClient.
@@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient {
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
/**
@@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
- .header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
+
+ public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ return Mono.defer(() -> {
+ try {
+ HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+ } catch (URISyntaxException e) {
+ logger.warn("Exception while evaluating URI");
+ return Mono.error(e);
+ }
+ });
+ }
+
+ private HttpHeaders getAllHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ return headers;
+
}
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
+ public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
return this;
}