diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2018-09-04 09:29:49 +0200 |
---|---|---|
committer | pwielebs <piotr.wielebski@nokia.com> | 2018-09-04 09:29:49 +0200 |
commit | 83df6e1df5ec20627c85af9ba2f49036dd58f328 (patch) | |
tree | de9282995bc4c7b0d0f277760b1d6f3574970794 | |
parent | 3c2766d8a64d21f402b5234e33419a8aed14d7ea (diff) |
Refatoring due to prh workflow
1. Added specified HttpClient for DmaaPPublisher:
*DmaaP Handle transfer-encoding: chunk header and
reject the request if it will be set by the client.
In conclusion no other reactive http client can be
used for pushing something to dmaap.
2. Added sll support to A&AI rective webclient.
*Behaviour of reactive A&AI HttpClient is different as
in native spring have without it.
3. Added 10s fixed time in PRH for requesting DmaaP.
4. Added debug log in reactive/native http clients.
5. Fixed reactive workflow of prh.
6. Updated the version of:
* spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE
* spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE
* spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE
* reactor-bom:Bismuth-RELEASE->Bismuth-SR10
Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8
Issue-ID: DCAEGEN2-743
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
33 files changed, 360 insertions, 345 deletions
@@ -134,7 +134,7 @@ <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> - <version>Bismuth-RELEASE</version> + <version>Bismuth-SR10</version> <type>pom</type> <scope>import</scope> </dependency> diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java index 6daf54a1..55dcb398 100644 --- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java +++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java @@ -24,11 +24,18 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.SERVICE_NAME; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + import java.util.Map; +import javax.net.ssl.SSLException; + import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -36,7 +43,7 @@ import reactor.core.publisher.Mono; public class AaiReactiveWebClient { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiReactiveWebClient.class); private String aaiUserName; private String aaiUserPassword; @@ -60,8 +67,19 @@ public class AaiReactiveWebClient { * * @return WebClient */ - public WebClient build() { + public WebClient build() throws SSLException { + SslContext sslContext; + sslContext = SslContextBuilder + .forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + LOGGER.debug("Setting ssl context"); + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(clientOptions -> { + clientOptions.sslContext(sslContext); + clientOptions.disablePool(); + })) .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders)) .filter(basicAuthentication(aaiUserName, aaiUserPassword)) .filter(logRequest()) @@ -72,9 +90,9 @@ public class AaiReactiveWebClient { private ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value))); return Mono.just(clientRequest); }); } @@ -82,7 +100,7 @@ public class AaiReactiveWebClient { private ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); - logger.info("Response Status {}", clientResponse.statusCode()); + LOGGER.info("Response Status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java index be6c63e0..358a4524 100644 --- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java +++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java @@ -20,35 +20,32 @@ package org.onap.dcaegen2.services.prh.service.producer; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.UUID; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; + +import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody; +import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.*; + public class AaiProducerReactiveHttpClient { + private WebClient webClient; private final String aaiHost; private final String aaiProtocol; private final Integer aaiHostPortNumber; private final String aaiBasePath; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; - + private final String aaiPnfPath; /** * Constructor of AaiProducerReactiveHttpClient. @@ -60,6 +57,7 @@ public class AaiProducerReactiveHttpClient { this.aaiProtocol = configuration.aaiProtocol(); this.aaiHostPortNumber = configuration.aaiPort(); this.aaiBasePath = configuration.aaiBasePath(); + this.aaiPnfPath = configuration.aaiPnfPath(); } /** @@ -68,10 +66,12 @@ public class AaiProducerReactiveHttpClient { * @param consumerDmaapModelMono - object which will be sent to AAI database * @return status code of operation */ - public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) { - return consumerDmaapModelMono - .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel)) - .flatMap(this::patchAaiRequest); + public Mono<ClientResponse> getAaiProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { + try { + return patchAaiRequest(consumerDmaapModelMono); + } catch (URISyntaxException e) { + return Mono.error(e); + } } public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) { @@ -79,26 +79,14 @@ public class AaiProducerReactiveHttpClient { return this; } - private Mono<Integer> patchAaiRequest(ConsumerDmaapModel dmaapModel) { - try { - return webClient.patch() + private Mono<ClientResponse> patchAaiRequest(ConsumerDmaapModel dmaapModel) throws URISyntaxException { + return + webClient.patch() .uri(getUri(dmaapModel.getSourceName())) .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) .header(X_INVOCATION_ID, UUID.randomUUID().toString()) - .body(BodyInserters.fromObject(dmaapModel)) - .retrieve() - .onStatus( - HttpStatus::is4xxClientError, - clientResponse -> Mono - .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())) - ) - .onStatus(HttpStatus::is5xxServerError, - clientResponse -> Mono - .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))) - .bodyToMono(Integer.class); - } catch (URISyntaxException e) { - return Mono.error(e); - } + .body(Mono.just(createJsonBody(dmaapModel)), String.class) + .exchange(); } URI getUri(String pnfName) throws URISyntaxException { @@ -106,7 +94,7 @@ public class AaiProducerReactiveHttpClient { .setScheme(aaiProtocol) .setHost(aaiHost) .setPort(aaiHostPortNumber) - .setPath(aaiBasePath + "/" + pnfName) + .setPath(aaiBasePath + aaiPnfPath + "/" + pnfName) .build(); } } diff --git a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java index 9b0f4fe8..4160f356 100644 --- a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java +++ b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.service.producer; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -31,18 +32,19 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; - class AaiProducerReactiveHttpClientTest { private static final Integer SUCCESS_RESPONSE = 200; @@ -50,16 +52,21 @@ class AaiProducerReactiveHttpClientTest { private static AaiClientConfiguration aaiConfigurationMock = mock(AaiClientConfiguration.class); private static WebClient webClient = mock(WebClient.class); - private static ConsumerDmaapModel dmaapModel = new ConsumerDmaapModelForUnitTest(); - private static WebClient.RequestBodyUriSpec requestBodyUriSpec; - private static ResponseSpec responseSpec; - private static Map<String, String> aaiHeaders; + private ConsumerDmaapModel dmaapModel = spy(new ConsumerDmaapModelForUnitTest()); + private WebClient.RequestBodyUriSpec requestBodyUriSpec; + private ResponseSpec responseSpec; - @BeforeAll - static void setUp() { - setupHeaders(); + private Map<String, String> aaiHeaders; + private ClientResponse clientResponse; + private Mono<ClientResponse> clientResponseMono; + @BeforeEach + void setUp() { + setupHeaders(); + clientResponse = mock(ClientResponse.class); + clientResponseMono = Mono.just(clientResponse); + when(dmaapModel.getSourceName()).thenReturn("NOKnhfsadhff"); when(aaiConfigurationMock.aaiHost()).thenReturn("54.45.33.2"); when(aaiConfigurationMock.aaiProtocol()).thenReturn("https"); when(aaiConfigurationMock.aaiPort()).thenReturn(1234); @@ -80,15 +87,6 @@ class AaiProducerReactiveHttpClientTest { responseSpec = mock(ResponseSpec.class); } - private static void setupHeaders() { - aaiHeaders = new HashMap<>(); - aaiHeaders.put("X-FromAppId", "PRH"); - aaiHeaders.put("X-TransactionId", "vv-temp"); - aaiHeaders.put("Accept", "application/json"); - aaiHeaders.put("Real-Time", "true"); - aaiHeaders.put("Content-Type", "application/merge-patch+json"); - } - @Test void getAaiProducerResponse_shouldReturn200() { //given @@ -98,12 +96,11 @@ class AaiProducerReactiveHttpClientTest { mockWebClientDependantObject(); doReturn(expectedResult).when(responseSpec).bodyToMono(Integer.class); aaiProducerReactiveHttpClient.createAaiWebClient(webClient); - Mono<Integer> response = aaiProducerReactiveHttpClient.getAaiProducerResponse(Mono.just(dmaapModel)); //then - StepVerifier.create(response).expectSubscription() + StepVerifier.create(aaiProducerReactiveHttpClient.getAaiProducerResponse(dmaapModel)).expectSubscription() .expectNextMatches(results -> { - Assertions.assertEquals(results, expectedResult.block()); + Assertions.assertEquals(results, clientResponse); return true; }).verifyComplete(); } @@ -115,24 +112,37 @@ class AaiProducerReactiveHttpClientTest { //when when(webClient.patch()).thenReturn(requestBodyUriSpec); aaiProducerReactiveHttpClient.createAaiWebClient(webClient); - when(aaiProducerReactiveHttpClient.getUri("pnfName")).thenThrow(URISyntaxException.class); - + doThrow(URISyntaxException.class).when(aaiProducerReactiveHttpClient).getUri(any()); //then StepVerifier.create( aaiProducerReactiveHttpClient.getAaiProducerResponse( - Mono.just(dmaapModel) + dmaapModel )).expectSubscription().expectError(Exception.class).verify(); } + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(aaiProducerReactiveHttpClient.getUri("NOKnhfsadhff"), + URI.create("https://54.45.33.2:1234/aai/v11/network/pnfs/pnf/NOKnhfsadhff")); + } + + + private void setupHeaders() { + aaiHeaders = new HashMap<>(); + aaiHeaders.put("X-FromAppId", "PRH"); + aaiHeaders.put("X-TransactionId", "vv-temp"); + aaiHeaders.put("Accept", "application/json"); + aaiHeaders.put("Real-Time", "true"); + aaiHeaders.put("Content-Type", "application/merge-patch+json"); + } + private void mockWebClientDependantObject() { WebClient.RequestHeadersSpec requestHeadersSpec = mock(WebClient.RequestHeadersSpec.class); when(webClient.patch()).thenReturn(requestBodyUriSpec); when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec); when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec); - doReturn(responseSpec).when(requestHeadersSpec).retrieve(); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + when(requestBodyUriSpec.body(any(), (Class<Object>) any())).thenReturn(requestHeadersSpec); + when(requestHeadersSpec.exchange()).thenReturn(clientResponseMono); } - } diff --git a/prh-app-server/config/application.yaml b/prh-app-server/config/application.yaml index 390ea9d2..2e6f54df 100644 --- a/prh-app-server/config/application.yaml +++ b/prh-app-server/config/application.yaml @@ -12,8 +12,10 @@ server: logging: level: ROOT: ERROR + org.onap.dcaegen2.services.prh: INFO + reactor.ipc.netty.http.client: WARN org.springframework: ERROR org.springframework.data: ERROR - org.onap.dcaegen2.services.prh: INFO + org.springframework.web.reactive: WARN app: filepath: config/prh_endpoints.json
\ No newline at end of file diff --git a/prh-app-server/config/prh_endpoints.json b/prh-app-server/config/prh_endpoints.json index e5d1c7b8..b3bff7d9 100644 --- a/prh-app-server/config/prh_endpoints.json +++ b/prh-app-server/config/prh_endpoints.json @@ -35,6 +35,8 @@ "aaiBasePath": "/aai/v12", "aaiPnfPath": "/network/pnfs/pnf", "aaiHeaders": { + "X-FromAppId": "prh", + "X-TransactionId": "9999", "Accept": "application/json", "Real-Time": "true", "Content-Type": "application/merge-patch+json" diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index fc485e15..96d47e34 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -21,15 +21,13 @@ package org.onap.dcaegen2.services.prh; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -44,17 +42,7 @@ public class MainApp { } @Bean - ConcurrentTaskScheduler concurrentTaskScheduler() { + TaskScheduler concurrentTaskScheduler() { return new ConcurrentTaskScheduler(); } - - @Bean - ThreadPoolTaskScheduler threadPoolTaskScheduler() { - ThreadPoolTaskScheduler threadPoolTaskScheduler - = new ThreadPoolTaskScheduler(); - threadPoolTaskScheduler.setPoolSize(5); - threadPoolTaskScheduler.setThreadNamePrefix( - "CloudThreadPoolTaskScheduler"); - return threadPoolTaskScheduler; - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index bc4bbf80..11c75e80 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers; @Primary public class CloudConfiguration extends AppConfig { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class); private PrhConfigurationProvider prhConfigurationProvider; private AaiClientConfiguration aaiClientCloudConfiguration; @@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig { } private void parsingConfigError(Throwable throwable) { - logger.warn("Error in case of processing system environment, more details below: ", throwable); + LOGGER.warn("Error in case of processing system environment, more details below: ", throwable); } private void cloudConfigError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); } private void parsingConfigSuccess(EnvProperties envProperties) { - logger.info("Fetching PRH configuration from ConfigBindingService/Consul"); + LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul"); prhConfigurationProvider.callForPrhConfiguration(envProperties) .subscribe(this::parseCloudConfig, this::cloudConfigError); } private void parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); + LOGGER.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java index d3b6cbb3..fdf6847b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java @@ -20,14 +20,15 @@ package org.onap.dcaegen2.services.prh.configuration; -import java.util.Optional; -import java.util.Properties; import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.prh.model.EnvProperties; import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.Properties; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 @@ -35,23 +36,23 @@ import reactor.core.publisher.Flux; class EnvironmentProcessor { private static final int DEFAULT_CONSUL_PORT = 8500; - private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class); private EnvironmentProcessor() { } - static Flux<EnvProperties> evaluate(Properties systemEnvironment) { - logger.info("Loading configuration from system environment variables {}", systemEnvironment); + static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + LOGGER.info("Loading configuration from system environment variables"); EnvProperties envProperties; try { envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) .appName(getService(systemEnvironment)).build(); } catch (EnvironmentLoaderException e) { - return Flux.error(e); + return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); - return Flux.just(envProperties); + LOGGER.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); } private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { @@ -78,8 +79,8 @@ class EnvironmentProcessor { } private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT environment has not been defined"); - logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + LOGGER.warn("$CONSUL_PORT environment has not been defined"); + LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); return DEFAULT_CONSUL_PORT; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 2fb61c06..92574417 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config { private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); AaiClientConfiguration aaiClientConfiguration; @@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config { DmaapPublisherConfiguration.class); } } catch (IOException e) { - logger.warn("Problem with file loading, file: {}", filepath, e); + LOGGER.warn("Problem with file loading, file: {}", filepath, e); } catch (JsonSyntaxException e) { - logger.warn("Problem with Json deserialization", e); + LOGGER.warn("Problem with Json deserialization", e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java index 6132a674..214d6db6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java @@ -21,28 +21,26 @@ package org.onap.dcaegen2.services.prh.configuration; import io.swagger.annotations.ApiOperation; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import javax.annotation.PostConstruct; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Mono; +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 */ @@ -50,24 +48,22 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig { - private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5; + private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker ENTRY = MarkerFactory.getMarker("ENRTY"); - private final ConcurrentTaskScheduler taskScheduler; + private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final TaskScheduler cloudTaskScheduler; private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler, - ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler, - CloudConfiguration cloudConfiguration) { - this.taskScheduler = concurrentTaskScheduler; + public SchedulerConfig(TaskScheduler taskScheduler, + ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { + this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.cloudTaskScheduler = cloudTaskScheduler; this.cloudConfiguration = cloudConfiguration; } @@ -94,9 +90,9 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { - logger.info(ENTRY,"Start scheduling PRH workflow"); + LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { - scheduledPrhTaskFutureList.add(cloudTaskScheduler + scheduledPrhTaskFutureList.add(taskScheduler .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledPrhTaskFutureList.add(taskScheduler diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java index 573724d8..1b2f4a11 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "HeartbeatController", description = "Check liveness of PRH service") public class HeartbeatController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class); /** * Endpoint for checking that PRH is alive. @@ -57,7 +57,7 @@ public class HeartbeatController { } ) public Mono<ResponseEntity<String>> heartbeat() { - logger.trace("Receiving heartbeat request"); + LOGGER.trace("Receiving heartbeat request"); return Mono.defer(() -> Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) ); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index 270fa584..9386b9e8 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class); private final SchedulerConfig schedulerConfig; @@ -52,14 +52,14 @@ public class ScheduleController { @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") public Mono<ResponseEntity<String>> startTasks() { - logger.trace("Receiving start scheduling worker request"); + LOGGER.trace("Receiving start scheduling worker request"); return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); } @RequestMapping(value = "stopPrh", method = RequestMethod.GET) @ApiOperation(value = "Receiving stop scheduling worker request") public Mono<ResponseEntity<String>> stopTask() { - logger.trace("Receiving stop scheduling worker request"); + LOGGER.trace("Receiving stop scheduling worker request"); return schedulerConfig.getResponseFromCancellationOfTasks(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 8742d872..a5ecc1dd 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ @@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser { private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; private static final String SOURCE_NAME = "sourceName"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. * @@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser { */ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message)) .flatMap(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromCallable(() -> new JsonParser().parse(message)); } private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) : getConsumerDmaapModelFromJsonArray(jsonElement); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java index 56ab484b..4f66e25c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java @@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; -public class HttpGetClient { +class HttpGetClient { - private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class); private final WebClient webClient; private final Gson gson; @@ -41,12 +41,12 @@ public class HttpGetClient { this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); } - HttpGetClient(WebClient webClient){ + HttpGetClient(WebClient webClient) { this.webClient = webClient; this.gson = new Gson(); } - public <T> Mono<T> callHttpGet(String url, Class<T> tClass) { + <T> Mono<T> callHttpGet(String url, Class<T> tClass) { return webClient .get() .uri(url) @@ -54,7 +54,7 @@ public class HttpGetClient { .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) .bodyToMono(String.class) - .flatMap(body->getJsonFromRequest(body,tClass)); + .flatMap(body -> getJsonFromRequest(body, tClass)); } private RuntimeException getException(ClientResponse response) { @@ -66,27 +66,26 @@ public class HttpGetClient { try { return Mono.just(parseJson(body, tClass)); } catch (JsonSyntaxException | IllegalStateException e) { - logger.warn("Converting string to json threw error ", e); return Mono.error(e); } } - private <T> T parseJson(String body, Class<T> tClass){ + private <T> T parseJson(String body, Class<T> tClass) { return gson.fromJson(body, tClass); } private static ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { - logger.info("Response status {}", clientResponse.statusCode()); + LOGGER.info("Response status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } private static ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value))); return Mono.just(clientRequest); }); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java index 7af4a7c8..b346bf5e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java @@ -38,7 +38,7 @@ import java.net.URISyntaxException; @Service public class PrhConfigurationProvider { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class); private final HttpGetClient httpGetClient; @@ -56,12 +56,12 @@ public class PrhConfigurationProvider { } private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { - logger.info("Retrieving Config Binding Service endpoint from Consul"); + LOGGER.info("Retrieving Config Binding Service endpoint from Consul"); try { return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) .flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName())); } catch (URISyntaxException e) { - logger.warn("Malformed Consul uri", e); + LOGGER.warn("Malformed Consul uri", e); return Mono.error(e); } } @@ -72,7 +72,7 @@ public class PrhConfigurationProvider { } private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) { - logger.info("Retrieving PRH configuration"); + LOGGER.info("Retrieving PRH configuration"); return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); } @@ -86,7 +86,7 @@ public class PrhConfigurationProvider { return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); } catch (URISyntaxException e) { - logger.warn("Malformed Config Binding Service uri", e); + LOGGER.warn("Malformed Config Binding Service uri", e); return Mono.error(e); } } @@ -99,7 +99,7 @@ public class PrhConfigurationProvider { throw new IllegalStateException("JSON Array was empty"); } } catch (IllegalStateException e) { - logger.warn("Failed to retrieve JSON Object from array", e); + LOGGER.warn("Failed to retrieve JSON Object from array", e); return Mono.error(e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index f58fed61..5a05d374 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ public abstract class AaiProducerTask { - abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; + abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiProducerReactiveHttpClient resolveClient(); + abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException; protected abstract AaiClientConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) - throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException; - WebClient buildWebClient() { + WebClient buildWebClient() throws SSLException { return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index f5b8307b..7ccf75a6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -43,9 +45,8 @@ import reactor.core.publisher.Mono; public class AaiProducerTaskImpl extends AaiProducerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; @@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends } @Override - Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - + Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { + LOGGER.info("Publish to AAI DmaapModel"); return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response)) { - return consumerDmaapModel; + if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { + return Mono.just(consumerDmaapModel); } return Mono .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); @@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends } @Override - AaiProducerReactiveHttpClient resolveClient() { - return new AaiProducerReactiveHttpClient(resolveConfiguration()); + AaiProducerReactiveHttpClient resolveClient() throws SSLException { + return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient()); } @Override @@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends } @Override - protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } aaiProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a912ca9e..d322a43e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -21,7 +21,6 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; @@ -33,7 +32,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException; + abstract Mono<ConsumerDmaapModel> consume(Mono<String> message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -41,7 +40,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 9e1fadf1..0d4be08e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,17 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,11 +40,11 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(Config config) { @@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override public Mono<ConsumerDmaapModel> execute(String object) { - dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", object); + DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + LOGGER.debug(INVOKE, "Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 9a5813d1..7a121d5f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -33,15 +33,14 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; abstract DMaaPProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException; - WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } + abstract RestTemplate buildWebClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 73260381..733b8651 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -39,8 +41,8 @@ import reactor.core.publisher.Mono; @Component public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final Config config; private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - return consumerDmaapModel.flatMap(dmaapModel -> { - logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), - dmaapModel); - return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel); - }); + Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) { + return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } dmaapProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } @Override + RestTemplate buildWebClient() { + return new RestTemplate(); + } + + @Override protected DmaapPublisherConfiguration resolveConfiguration() { return config.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6432a338..f74bc56a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; @@ -72,24 +72,33 @@ public class ScheduledTasks { */ public void scheduleMainPrhEventTask() { MDCVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); + try { + logger.trace("Execution of tasks was registered"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromDMaaPMessage() + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + mainCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private void onComplete() { logger.info("PRH tasks have been completed"); } - private void onSuccess(String responseCode) { - MDC.put(RESPONSE_CODE, responseCode); - logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(ResponseEntity<String> responseCode) { + MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + responseCode.getStatusCode().value()); } private void onError(Throwable throwable) { @@ -98,24 +107,26 @@ public class ScheduledTasks { } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { - return () -> { + + private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Mono.defer(() -> { MDCVariables.setMdcContextMap(contextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); + logger.info("Init configs"); dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }; + }); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { try { return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); } catch (PrhTaskException e) { diff --git a/prh-app-server/src/main/resources/application.properties b/prh-app-server/src/main/resources/application.properties index ac0192ca..e343a360 100644 --- a/prh-app-server/src/main/resources/application.properties +++ b/prh-app-server/src/main/resources/application.properties @@ -9,6 +9,9 @@ logging.level.root=ERROR logging.level.org.springframework=ERROR logging.level.org.springframework.data=ERROR logging.level.org.onap.dcaegen2.services.prh=INFO +logging.level.org.springframework.web.reactive=WARN +logging.level.reactor.ipc.netty.http.client=WARN app.filepath=config/prh_endpoints.json app.xonaprequestid=requestID app.xinvocationid=invocationID + diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java index 54259397..f5cc6b24 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java @@ -29,8 +29,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import javax.net.ssl.SSLException; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; @@ -40,6 +41,8 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -56,14 +59,16 @@ class AaiProducerTaskImplTest { private static final String BASE_PATH = "/aai/v11"; private static final String PNF_PATH = "/network/pnfs/pnf"; - private static ConsumerDmaapModel consumerDmaapModel; - private static AaiProducerTaskImpl aaiProducerTask; - private static AaiClientConfiguration aaiClientConfiguration; - private static AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; - private static AppConfig appConfig; + private ConsumerDmaapModel consumerDmaapModel; + private AaiProducerTaskImpl aaiProducerTask; + private AaiClientConfiguration aaiClientConfiguration; + private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; + private AppConfig appConfig; + private ClientResponse clientResponse; - @BeforeAll - static void setUp() { + @BeforeEach + void setUp() { + clientResponse = mock(ClientResponse.class); aaiClientConfiguration = new ImmutableAaiClientConfiguration.Builder() .aaiHost(AAI_HOST) .aaiPort(PORT) @@ -81,17 +86,6 @@ class AaiProducerTaskImplTest { } - private static void getAaiProducerTask_whenMockingResponseObject(Integer statusCode) { - //given - aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class); - when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any())) - .thenReturn(Mono.just(statusCode)); - when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration); - aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); - when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration); - doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient(); - } - @Test void whenPassedObjectDoesntFit_ThrowsPrhTaskException() { //given/when/ @@ -105,10 +99,10 @@ class AaiProducerTaskImplTest { } @Test - void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException { + void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException { //given/when getAaiProducerTask_whenMockingResponseObject(200); - Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(Mono.just(consumerDmaapModel)); + Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(consumerDmaapModel); //then verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any()); @@ -118,13 +112,26 @@ class AaiProducerTaskImplTest { } @Test - void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException { + void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException, SSLException { //given/when getAaiProducerTask_whenMockingResponseObject(400); - StepVerifier.create(aaiProducerTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() + StepVerifier.create(aaiProducerTask.execute(consumerDmaapModel)).expectSubscription() .expectError(PrhTaskException.class).verify(); //then verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any()); verifyNoMoreInteractions(aaiProducerReactiveHttpClient); } + + private void getAaiProducerTask_whenMockingResponseObject(int statusCode) throws SSLException { + //given + doReturn(HttpStatus.valueOf(statusCode)).when(clientResponse).statusCode(); + Mono<ClientResponse> clientResponseMono = Mono.just(clientResponse); + aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class); + when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any())) + .thenReturn(clientResponseMono); + when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration); + aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); + when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration); + doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient(); + } }
\ No newline at end of file diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java index 82dcdae9..231bf144 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java @@ -20,10 +20,6 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient; @@ -31,6 +27,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import javax.net.ssl.SSLException; + +import static org.mockito.Mockito.*; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -44,7 +44,7 @@ public class AaiPublisherTaskSpy { */ @Bean @Primary - public AaiProducerTask registerSimpleAaiPublisherTask() { + public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException { AppConfig appConfig = spy(AppConfig.class); doReturn(mock(AaiClientConfiguration.class)).when(appConfig).getAaiClientConfiguration(); AaiProducerTaskImpl aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java index 453679df..ae7b8e77 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java @@ -20,16 +20,6 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -42,9 +32,14 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 */ @@ -84,15 +79,16 @@ class DmaapPublisherTaskImplTest { @Test void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException { //given - prepareMocksForTests(HttpStatus.OK.value()); + ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.OK.value()); //when - StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() - .expectNext(HttpStatus.OK.toString()).verifyComplete(); + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() + .expectNext(responseEntity).verifyComplete(); //then verify(dMaaPProducerReactiveHttpClient, times(1)) - .getDMaaPProducerResponse(any()); + .getDMaaPProducerResponse(consumerDmaapModel); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @@ -100,24 +96,30 @@ class DmaapPublisherTaskImplTest { @Test void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException { //given - prepareMocksForTests(HttpStatus.UNAUTHORIZED.value()); + ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.UNAUTHORIZED.value()); //when - StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() - .expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete(); + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() + .expectNext(responseEntity).verifyComplete(); //then - verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any()); + verify(dMaaPProducerReactiveHttpClient, times(1)) + .getDMaaPProducerResponse(consumerDmaapModel); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } - private void prepareMocksForTests(Integer httpResponseCode) { + private ResponseEntity<String> prepareMocksForTests(Integer httpResponseCode) { + ResponseEntity<String> responseEntity = mock(ResponseEntity.class); + //when + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode)); dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any())) - .thenReturn(Mono.just(httpResponseCode.toString())); + .thenReturn(Mono.just(responseEntity)); dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); + return responseEntity; } }
\ No newline at end of file diff --git a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java index 145d9176..83a078df 100644 --- a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java +++ b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java @@ -42,4 +42,4 @@ public class CommonFunctions { return gsonBuilder.create().toJson(ImmutableConsumerDmaapModel.builder().ipv4(consumerDmaapModel.getIpv4()) .ipv6(consumerDmaapModel.getIpv6()).sourceName(consumerDmaapModel.getSourceName()).build()); } -} +}
\ No newline at end of file diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java index 8ce81757..4327dfbf 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java @@ -20,13 +20,10 @@ package org.onap.dcaegen2.services.prh.service; -import java.util.HashMap; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient { private String dmaaPUserName; private String dmaaPUserPassword; - private String dmaaPContentType; /** * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. @@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient { public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); - this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); - return this; } @@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient { */ public WebClient build() { return WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) .filter(logRequest()) .filter(logResponse()) .build(); diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java index ac13dd61..f9a66378 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java @@ -29,9 +29,8 @@ import java.net.URISyntaxException; import java.util.UUID; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -41,13 +40,13 @@ import reactor.core.publisher.Mono; */ public class DMaaPConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final String dmaapHostName; private final String dmaapProtocol; private final Integer dmaapPortNumber; private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; private WebClient webClient; /** @@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient { .uri(getUri()) .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) .header(X_INVOCATION_ID, UUID.randomUUID().toString()) + .header(HttpHeaders.CONTENT_TYPE, contentType) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())) ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))) .bodyToMono(String.class); } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI "); return Mono.error(e); } } diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java index d049d380..5c72b38c 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.service.producer; +import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID; @@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -44,11 +47,13 @@ import reactor.core.publisher.Mono; public class DMaaPProducerReactiveHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final String dmaapHostName; private final Integer dmaapPortNumber; private final String dmaapProtocol; private final String dmaapTopicName; - private WebClient webClient; + private final String dmaapContentType; + private RestTemplate restTemplate; /** * Constructor DMaaPProducerReactiveHttpClient. @@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient { this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); } /** @@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient { * @param consumerDmaapModelMono - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { - try { - return webClient - .post() - .uri(getUri()) - .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) - .header(X_INVOCATION_ID, UUID.randomUUID().toString()) - .body(BodyInserters.fromObject(consumerDmaapModelMono)) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode())) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode()))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI"); - return Mono.error(e); - } + + public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { + return Mono.defer(() -> { + try { + HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders()); + return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class)); + } catch (URISyntaxException e) { + logger.warn("Exception while evaluating URI"); + return Mono.error(e); + } + }); + } + + private HttpHeaders getAllHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)); + headers.set(X_INVOCATION_ID, UUID.randomUUID().toString()); + headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType); + return headers; + } - public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; + public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) { + this.restTemplate = restTemplate; return this; } diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java index 1a237562..9f693701 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java @@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest { .expectError(Exception.class).verify(); } + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12")); + } + private void mockDependantObjects() { when(webClient.get()).thenReturn(requestHeadersSpec); when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec); diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java index e8af8cd9..05b74895 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java @@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; -import reactor.core.publisher.Mono; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 */ + class DMaaPProducerReactiveHttpClientTest { private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest { private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock( DmaapPublisherConfiguration.class); private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest(); - private WebClient webClient = mock(WebClient.class); - private RequestBodyUriSpec requestBodyUriSpec; - private ResponseSpec responseSpec; @BeforeEach @@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest { when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json"); - when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady"); - + when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY"); dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock); - webClient = spy(WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType()) - .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(), - dmaapPublisherConfigurationMock.dmaapUserPassword())) - .build()); - requestBodyUriSpec = mock(RequestBodyUriSpec.class); - responseSpec = mock(ResponseSpec.class); } @Test void getHttpResponse_Success() { //given - Integer responseSuccess = 200; - Mono<Integer> expectedResult = Mono.just(responseSuccess); - + int responseSuccess = 200; + ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class); + RestTemplate restTemplate = mock(RestTemplate.class); //when - mockWebClientDependantObject(); - doReturn(expectedResult).when(responseSpec).bodyToMono(String.class); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); - Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess)); + doReturn(mockedResponseEntity).when(restTemplate) + .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any()); + dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate); //then - Assertions.assertEquals(response.block(), expectedResult.block()); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel)) + .expectSubscription().expectNext(mockedResponseEntity).verifyComplete(); } @Test @@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest { //given dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient); //when - when(webClient.post()).thenReturn(requestBodyUriSpec); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class); //then @@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest { .expectError(Exception.class).verify(); } - private void mockWebClientDependantObject() { - RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class); - when(webClient.post()).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec); - doReturn(responseSpec).when(requestHeadersSpec).retrieve(); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY")); } }
\ No newline at end of file |