aboutsummaryrefslogtreecommitdiffstats
path: root/prh-dmaap-client
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
committerpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
commit83df6e1df5ec20627c85af9ba2f49036dd58f328 (patch)
treede9282995bc4c7b0d0f277760b1d6f3574970794 /prh-dmaap-client
parent3c2766d8a64d21f402b5234e33419a8aed14d7ea (diff)
Refatoring due to prh workflow
1. Added specified HttpClient for DmaaPPublisher: *DmaaP Handle transfer-encoding: chunk header and reject the request if it will be set by the client. In conclusion no other reactive http client can be used for pushing something to dmaap. 2. Added sll support to A&AI rective webclient. *Behaviour of reactive A&AI HttpClient is different as in native spring have without it. 3. Added 10s fixed time in PRH for requesting DmaaP. 4. Added debug log in reactive/native http clients. 5. Fixed reactive workflow of prh. 6. Updated the version of: * spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE * spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE * spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE * reactor-bom:Bismuth-RELEASE->Bismuth-SR10 Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8 Issue-ID: DCAEGEN2-743 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-dmaap-client')
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java7
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java12
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java57
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java7
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java58
5 files changed, 66 insertions, 75 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
index 8ce81757..4327dfbf 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.prh.service;
-import java.util.HashMap;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserName;
private String dmaaPUserPassword;
- private String dmaaPContentType;
/**
* Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
@@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient {
public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
- this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
-
return this;
}
@@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient {
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
.filter(logRequest())
.filter(logResponse())
.build();
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
index ac13dd61..f9a66378 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -29,9 +29,8 @@ import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -41,13 +40,13 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final String dmaapHostName;
private final String dmaapProtocol;
private final Integer dmaapPortNumber;
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
private WebClient webClient;
/**
@@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient {
.uri(getUri())
.header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
.header(X_INVOCATION_ID, UUID.randomUUID().toString())
+ .header(HttpHeaders.CONTENT_TYPE, contentType)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
return Mono.error(e);
}
}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index d049d380..5c72b38c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service.producer;
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
@@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -44,11 +47,13 @@ import reactor.core.publisher.Mono;
public class DMaaPProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
private final String dmaapTopicName;
- private WebClient webClient;
+ private final String dmaapContentType;
+ private RestTemplate restTemplate;
/**
* Constructor DMaaPProducerReactiveHttpClient.
@@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient {
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
/**
@@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
- .header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
+
+ public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ return Mono.defer(() -> {
+ try {
+ HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+ } catch (URISyntaxException e) {
+ logger.warn("Exception while evaluating URI");
+ return Mono.error(e);
+ }
+ });
+ }
+
+ private HttpHeaders getAllHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ return headers;
+
}
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
+ public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
return this;
}
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
index 1a237562..9f693701 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
@@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"));
+ }
+
private void mockDependantObjects() {
when(webClient.get()).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
index e8af8cd9..05b74895 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
@@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-import reactor.core.publisher.Mono;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
+
class DMaaPProducerReactiveHttpClientTest {
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest {
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
DmaapPublisherConfiguration.class);
private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
- private WebClient webClient = mock(WebClient.class);
- private RequestBodyUriSpec requestBodyUriSpec;
- private ResponseSpec responseSpec;
@BeforeEach
@@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
-
+ when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY");
dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
- webClient = spy(WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
- .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
- dmaapPublisherConfigurationMock.dmaapUserPassword()))
- .build());
- requestBodyUriSpec = mock(RequestBodyUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
}
@Test
void getHttpResponse_Success() {
//given
- Integer responseSuccess = 200;
- Mono<Integer> expectedResult = Mono.just(responseSuccess);
-
+ int responseSuccess = 200;
+ ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class);
+ RestTemplate restTemplate = mock(RestTemplate.class);
//when
- mockWebClientDependantObject();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess));
+ doReturn(mockedResponseEntity).when(restTemplate)
+ .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any());
+ dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate);
//then
- Assertions.assertEquals(response.block(), expectedResult.block());
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
+ .expectSubscription().expectNext(mockedResponseEntity).verifyComplete();
}
@Test
@@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest {
//given
dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient);
//when
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
//then
@@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
- private void mockWebClientDependantObject() {
- RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
- doReturn(responseSpec).when(requestHeadersSpec).retrieve();
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY"));
}
} \ No newline at end of file