diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2018-09-04 09:29:49 +0200 |
---|---|---|
committer | pwielebs <piotr.wielebski@nokia.com> | 2018-09-04 09:29:49 +0200 |
commit | 83df6e1df5ec20627c85af9ba2f49036dd58f328 (patch) | |
tree | de9282995bc4c7b0d0f277760b1d6f3574970794 /prh-dmaap-client/src | |
parent | 3c2766d8a64d21f402b5234e33419a8aed14d7ea (diff) |
Refatoring due to prh workflow
1. Added specified HttpClient for DmaaPPublisher:
*DmaaP Handle transfer-encoding: chunk header and
reject the request if it will be set by the client.
In conclusion no other reactive http client can be
used for pushing something to dmaap.
2. Added sll support to A&AI rective webclient.
*Behaviour of reactive A&AI HttpClient is different as
in native spring have without it.
3. Added 10s fixed time in PRH for requesting DmaaP.
4. Added debug log in reactive/native http clients.
5. Fixed reactive workflow of prh.
6. Updated the version of:
* spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE
* spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE
* spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE
* reactor-bom:Bismuth-RELEASE->Bismuth-SR10
Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8
Issue-ID: DCAEGEN2-743
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-dmaap-client/src')
5 files changed, 66 insertions, 75 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java index 8ce81757..4327dfbf 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java @@ -20,13 +20,10 @@ package org.onap.dcaegen2.services.prh.service; -import java.util.HashMap; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient { private String dmaaPUserName; private String dmaaPUserPassword; - private String dmaaPContentType; /** * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. @@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient { public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); - this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); - return this; } @@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient { */ public WebClient build() { return WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) .filter(logRequest()) .filter(logResponse()) .build(); diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java index ac13dd61..f9a66378 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java @@ -29,9 +29,8 @@ import java.net.URISyntaxException; import java.util.UUID; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -41,13 +40,13 @@ import reactor.core.publisher.Mono; */ public class DMaaPConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final String dmaapHostName; private final String dmaapProtocol; private final Integer dmaapPortNumber; private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; private WebClient webClient; /** @@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient { .uri(getUri()) .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) .header(X_INVOCATION_ID, UUID.randomUUID().toString()) + .header(HttpHeaders.CONTENT_TYPE, contentType) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())) ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))) .bodyToMono(String.class); } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI "); return Mono.error(e); } } diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java index d049d380..5c72b38c 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.service.producer; +import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID; @@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -44,11 +47,13 @@ import reactor.core.publisher.Mono; public class DMaaPProducerReactiveHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final String dmaapHostName; private final Integer dmaapPortNumber; private final String dmaapProtocol; private final String dmaapTopicName; - private WebClient webClient; + private final String dmaapContentType; + private RestTemplate restTemplate; /** * Constructor DMaaPProducerReactiveHttpClient. @@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient { this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); } /** @@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient { * @param consumerDmaapModelMono - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { - try { - return webClient - .post() - .uri(getUri()) - .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) - .header(X_INVOCATION_ID, UUID.randomUUID().toString()) - .body(BodyInserters.fromObject(consumerDmaapModelMono)) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode())) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode()))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI"); - return Mono.error(e); - } + + public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { + return Mono.defer(() -> { + try { + HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders()); + return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class)); + } catch (URISyntaxException e) { + logger.warn("Exception while evaluating URI"); + return Mono.error(e); + } + }); + } + + private HttpHeaders getAllHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)); + headers.set(X_INVOCATION_ID, UUID.randomUUID().toString()); + headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType); + return headers; + } - public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; + public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) { + this.restTemplate = restTemplate; return this; } diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java index 1a237562..9f693701 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java @@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest { .expectError(Exception.class).verify(); } + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12")); + } + private void mockDependantObjects() { when(webClient.get()).thenReturn(requestHeadersSpec); when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec); diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java index e8af8cd9..05b74895 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java @@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; -import reactor.core.publisher.Mono; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 */ + class DMaaPProducerReactiveHttpClientTest { private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest { private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock( DmaapPublisherConfiguration.class); private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest(); - private WebClient webClient = mock(WebClient.class); - private RequestBodyUriSpec requestBodyUriSpec; - private ResponseSpec responseSpec; @BeforeEach @@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest { when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json"); - when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady"); - + when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY"); dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock); - webClient = spy(WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType()) - .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(), - dmaapPublisherConfigurationMock.dmaapUserPassword())) - .build()); - requestBodyUriSpec = mock(RequestBodyUriSpec.class); - responseSpec = mock(ResponseSpec.class); } @Test void getHttpResponse_Success() { //given - Integer responseSuccess = 200; - Mono<Integer> expectedResult = Mono.just(responseSuccess); - + int responseSuccess = 200; + ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class); + RestTemplate restTemplate = mock(RestTemplate.class); //when - mockWebClientDependantObject(); - doReturn(expectedResult).when(responseSpec).bodyToMono(String.class); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); - Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess)); + doReturn(mockedResponseEntity).when(restTemplate) + .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any()); + dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate); //then - Assertions.assertEquals(response.block(), expectedResult.block()); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel)) + .expectSubscription().expectNext(mockedResponseEntity).verifyComplete(); } @Test @@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest { //given dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient); //when - when(webClient.post()).thenReturn(requestBodyUriSpec); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class); //then @@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest { .expectError(Exception.class).verify(); } - private void mockWebClientDependantObject() { - RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class); - when(webClient.post()).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec); - doReturn(responseSpec).when(requestHeadersSpec).retrieve(); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY")); } }
\ No newline at end of file |