aboutsummaryrefslogtreecommitdiffstats
path: root/prh-dmaap-client
diff options
context:
space:
mode:
Diffstat (limited to 'prh-dmaap-client')
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java7
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java12
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java57
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java7
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java58
5 files changed, 66 insertions, 75 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
index 8ce81757..4327dfbf 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.prh.service;
-import java.util.HashMap;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserName;
private String dmaaPUserPassword;
- private String dmaaPContentType;
/**
* Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
@@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient {
public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
- this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
-
return this;
}
@@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient {
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
.filter(logRequest())
.filter(logResponse())
.build();
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
index ac13dd61..f9a66378 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -29,9 +29,8 @@ import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -41,13 +40,13 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final String dmaapHostName;
private final String dmaapProtocol;
private final Integer dmaapPortNumber;
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
private WebClient webClient;
/**
@@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient {
.uri(getUri())
.header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
.header(X_INVOCATION_ID, UUID.randomUUID().toString())
+ .header(HttpHeaders.CONTENT_TYPE, contentType)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
return Mono.error(e);
}
}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index d049d380..5c72b38c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service.producer;
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
@@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -44,11 +47,13 @@ import reactor.core.publisher.Mono;
public class DMaaPProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
private final String dmaapTopicName;
- private WebClient webClient;
+ private final String dmaapContentType;
+ private RestTemplate restTemplate;
/**
* Constructor DMaaPProducerReactiveHttpClient.
@@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient {
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
/**
@@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
- .header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
+
+ public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ return Mono.defer(() -> {
+ try {
+ HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+ } catch (URISyntaxException e) {
+ logger.warn("Exception while evaluating URI");
+ return Mono.error(e);
+ }
+ });
+ }
+
+ private HttpHeaders getAllHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ return headers;
+
}
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
+ public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
return this;
}
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
index 1a237562..9f693701 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
@@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"));
+ }
+
private void mockDependantObjects() {
when(webClient.get()).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
index e8af8cd9..05b74895 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
@@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-import reactor.core.publisher.Mono;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
+
class DMaaPProducerReactiveHttpClientTest {
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest {
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
DmaapPublisherConfiguration.class);
private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
- private WebClient webClient = mock(WebClient.class);
- private RequestBodyUriSpec requestBodyUriSpec;
- private ResponseSpec responseSpec;
@BeforeEach
@@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
-
+ when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY");
dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
- webClient = spy(WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
- .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
- dmaapPublisherConfigurationMock.dmaapUserPassword()))
- .build());
- requestBodyUriSpec = mock(RequestBodyUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
}
@Test
void getHttpResponse_Success() {
//given
- Integer responseSuccess = 200;
- Mono<Integer> expectedResult = Mono.just(responseSuccess);
-
+ int responseSuccess = 200;
+ ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class);
+ RestTemplate restTemplate = mock(RestTemplate.class);
//when
- mockWebClientDependantObject();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess));
+ doReturn(mockedResponseEntity).when(restTemplate)
+ .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any());
+ dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate);
//then
- Assertions.assertEquals(response.block(), expectedResult.block());
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
+ .expectSubscription().expectNext(mockedResponseEntity).verifyComplete();
}
@Test
@@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest {
//given
dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient);
//when
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
//then
@@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
- private void mockWebClientDependantObject() {
- RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
- doReturn(responseSpec).when(requestHeadersSpec).retrieve();
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY"));
}
} \ No newline at end of file