diff options
33 files changed, 360 insertions, 345 deletions
@@ -134,7 +134,7 @@ <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> - <version>Bismuth-RELEASE</version> + <version>Bismuth-SR10</version> <type>pom</type> <scope>import</scope> </dependency> diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java index 6daf54a1..55dcb398 100644 --- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java +++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java @@ -24,11 +24,18 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.SERVICE_NAME; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + import java.util.Map; +import javax.net.ssl.SSLException; + import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -36,7 +43,7 @@ import reactor.core.publisher.Mono; public class AaiReactiveWebClient { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiReactiveWebClient.class); private String aaiUserName; private String aaiUserPassword; @@ -60,8 +67,19 @@ public class AaiReactiveWebClient { * * @return WebClient */ - public WebClient build() { + public WebClient build() throws SSLException { + SslContext sslContext; + sslContext = SslContextBuilder + .forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + LOGGER.debug("Setting ssl context"); + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(clientOptions -> { + clientOptions.sslContext(sslContext); + clientOptions.disablePool(); + })) .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders)) .filter(basicAuthentication(aaiUserName, aaiUserPassword)) .filter(logRequest()) @@ -72,9 +90,9 @@ public class AaiReactiveWebClient { private ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value))); return Mono.just(clientRequest); }); } @@ -82,7 +100,7 @@ public class AaiReactiveWebClient { private ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); - logger.info("Response Status {}", clientResponse.statusCode()); + LOGGER.info("Response Status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java index be6c63e0..358a4524 100644 --- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java +++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java @@ -20,35 +20,32 @@ package org.onap.dcaegen2.services.prh.service.producer; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.UUID; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; + +import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody; +import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.*; + public class AaiProducerReactiveHttpClient { + private WebClient webClient; private final String aaiHost; private final String aaiProtocol; private final Integer aaiHostPortNumber; private final String aaiBasePath; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; - + private final String aaiPnfPath; /** * Constructor of AaiProducerReactiveHttpClient. @@ -60,6 +57,7 @@ public class AaiProducerReactiveHttpClient { this.aaiProtocol = configuration.aaiProtocol(); this.aaiHostPortNumber = configuration.aaiPort(); this.aaiBasePath = configuration.aaiBasePath(); + this.aaiPnfPath = configuration.aaiPnfPath(); } /** @@ -68,10 +66,12 @@ public class AaiProducerReactiveHttpClient { * @param consumerDmaapModelMono - object which will be sent to AAI database * @return status code of operation */ - public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) { - return consumerDmaapModelMono - .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel)) - .flatMap(this::patchAaiRequest); + public Mono<ClientResponse> getAaiProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { + try { + return patchAaiRequest(consumerDmaapModelMono); + } catch (URISyntaxException e) { + return Mono.error(e); + } } public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) { @@ -79,26 +79,14 @@ public class AaiProducerReactiveHttpClient { return this; } - private Mono<Integer> patchAaiRequest(ConsumerDmaapModel dmaapModel) { - try { - return webClient.patch() + private Mono<ClientResponse> patchAaiRequest(ConsumerDmaapModel dmaapModel) throws URISyntaxException { + return + webClient.patch() .uri(getUri(dmaapModel.getSourceName())) .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) .header(X_INVOCATION_ID, UUID.randomUUID().toString()) - .body(BodyInserters.fromObject(dmaapModel)) - .retrieve() - .onStatus( - HttpStatus::is4xxClientError, - clientResponse -> Mono - .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())) - ) - .onStatus(HttpStatus::is5xxServerError, - clientResponse -> Mono - .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))) - .bodyToMono(Integer.class); - } catch (URISyntaxException e) { - return Mono.error(e); - } + .body(Mono.just(createJsonBody(dmaapModel)), String.class) + .exchange(); } URI getUri(String pnfName) throws URISyntaxException { @@ -106,7 +94,7 @@ public class AaiProducerReactiveHttpClient { .setScheme(aaiProtocol) .setHost(aaiHost) .setPort(aaiHostPortNumber) - .setPath(aaiBasePath + "/" + pnfName) + .setPath(aaiBasePath + aaiPnfPath + "/" + pnfName) .build(); } } diff --git a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java index 9b0f4fe8..4160f356 100644 --- a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java +++ b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.service.producer; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -31,18 +32,19 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; - class AaiProducerReactiveHttpClientTest { private static final Integer SUCCESS_RESPONSE = 200; @@ -50,16 +52,21 @@ class AaiProducerReactiveHttpClientTest { private static AaiClientConfiguration aaiConfigurationMock = mock(AaiClientConfiguration.class); private static WebClient webClient = mock(WebClient.class); - private static ConsumerDmaapModel dmaapModel = new ConsumerDmaapModelForUnitTest(); - private static WebClient.RequestBodyUriSpec requestBodyUriSpec; - private static ResponseSpec responseSpec; - private static Map<String, String> aaiHeaders; + private ConsumerDmaapModel dmaapModel = spy(new ConsumerDmaapModelForUnitTest()); + private WebClient.RequestBodyUriSpec requestBodyUriSpec; + private ResponseSpec responseSpec; - @BeforeAll - static void setUp() { - setupHeaders(); + private Map<String, String> aaiHeaders; + private ClientResponse clientResponse; + private Mono<ClientResponse> clientResponseMono; + @BeforeEach + void setUp() { + setupHeaders(); + clientResponse = mock(ClientResponse.class); + clientResponseMono = Mono.just(clientResponse); + when(dmaapModel.getSourceName()).thenReturn("NOKnhfsadhff"); when(aaiConfigurationMock.aaiHost()).thenReturn("54.45.33.2"); when(aaiConfigurationMock.aaiProtocol()).thenReturn("https"); when(aaiConfigurationMock.aaiPort()).thenReturn(1234); @@ -80,15 +87,6 @@ class AaiProducerReactiveHttpClientTest { responseSpec = mock(ResponseSpec.class); } - private static void setupHeaders() { - aaiHeaders = new HashMap<>(); - aaiHeaders.put("X-FromAppId", "PRH"); - aaiHeaders.put("X-TransactionId", "vv-temp"); - aaiHeaders.put("Accept", "application/json"); - aaiHeaders.put("Real-Time", "true"); - aaiHeaders.put("Content-Type", "application/merge-patch+json"); - } - @Test void getAaiProducerResponse_shouldReturn200() { //given @@ -98,12 +96,11 @@ class AaiProducerReactiveHttpClientTest { mockWebClientDependantObject(); doReturn(expectedResult).when(responseSpec).bodyToMono(Integer.class); aaiProducerReactiveHttpClient.createAaiWebClient(webClient); - Mono<Integer> response = aaiProducerReactiveHttpClient.getAaiProducerResponse(Mono.just(dmaapModel)); //then - StepVerifier.create(response).expectSubscription() + StepVerifier.create(aaiProducerReactiveHttpClient.getAaiProducerResponse(dmaapModel)).expectSubscription() .expectNextMatches(results -> { - Assertions.assertEquals(results, expectedResult.block()); + Assertions.assertEquals(results, clientResponse); return true; }).verifyComplete(); } @@ -115,24 +112,37 @@ class AaiProducerReactiveHttpClientTest { //when when(webClient.patch()).thenReturn(requestBodyUriSpec); aaiProducerReactiveHttpClient.createAaiWebClient(webClient); - when(aaiProducerReactiveHttpClient.getUri("pnfName")).thenThrow(URISyntaxException.class); - + doThrow(URISyntaxException.class).when(aaiProducerReactiveHttpClient).getUri(any()); //then StepVerifier.create( aaiProducerReactiveHttpClient.getAaiProducerResponse( - Mono.just(dmaapModel) + dmaapModel )).expectSubscription().expectError(Exception.class).verify(); } + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(aaiProducerReactiveHttpClient.getUri("NOKnhfsadhff"), + URI.create("https://54.45.33.2:1234/aai/v11/network/pnfs/pnf/NOKnhfsadhff")); + } + + + private void setupHeaders() { + aaiHeaders = new HashMap<>(); + aaiHeaders.put("X-FromAppId", "PRH"); + aaiHeaders.put("X-TransactionId", "vv-temp"); + aaiHeaders.put("Accept", "application/json"); + aaiHeaders.put("Real-Time", "true"); + aaiHeaders.put("Content-Type", "application/merge-patch+json"); + } + private void mockWebClientDependantObject() { WebClient.RequestHeadersSpec requestHeadersSpec = mock(WebClient.RequestHeadersSpec.class); when(webClient.patch()).thenReturn(requestBodyUriSpec); when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec); when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec); - doReturn(responseSpec).when(requestHeadersSpec).retrieve(); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + when(requestBodyUriSpec.body(any(), (Class<Object>) any())).thenReturn(requestHeadersSpec); + when(requestHeadersSpec.exchange()).thenReturn(clientResponseMono); } - } diff --git a/prh-app-server/config/application.yaml b/prh-app-server/config/application.yaml index 390ea9d2..2e6f54df 100644 --- a/prh-app-server/config/application.yaml +++ b/prh-app-server/config/application.yaml @@ -12,8 +12,10 @@ server: logging: level: ROOT: ERROR + org.onap.dcaegen2.services.prh: INFO + reactor.ipc.netty.http.client: WARN org.springframework: ERROR org.springframework.data: ERROR - org.onap.dcaegen2.services.prh: INFO + org.springframework.web.reactive: WARN app: filepath: config/prh_endpoints.json
\ No newline at end of file diff --git a/prh-app-server/config/prh_endpoints.json b/prh-app-server/config/prh_endpoints.json index e5d1c7b8..b3bff7d9 100644 --- a/prh-app-server/config/prh_endpoints.json +++ b/prh-app-server/config/prh_endpoints.json @@ -35,6 +35,8 @@ "aaiBasePath": "/aai/v12", "aaiPnfPath": "/network/pnfs/pnf", "aaiHeaders": { + "X-FromAppId": "prh", + "X-TransactionId": "9999", "Accept": "application/json", "Real-Time": "true", "Content-Type": "application/merge-patch+json" diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index fc485e15..96d47e34 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -21,15 +21,13 @@ package org.onap.dcaegen2.services.prh; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -44,17 +42,7 @@ public class MainApp { } @Bean - ConcurrentTaskScheduler concurrentTaskScheduler() { + TaskScheduler concurrentTaskScheduler() { return new ConcurrentTaskScheduler(); } - - @Bean - ThreadPoolTaskScheduler threadPoolTaskScheduler() { - ThreadPoolTaskScheduler threadPoolTaskScheduler - = new ThreadPoolTaskScheduler(); - threadPoolTaskScheduler.setPoolSize(5); - threadPoolTaskScheduler.setThreadNamePrefix( - "CloudThreadPoolTaskScheduler"); - return threadPoolTaskScheduler; - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index bc4bbf80..11c75e80 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers; @Primary public class CloudConfiguration extends AppConfig { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class); private PrhConfigurationProvider prhConfigurationProvider; private AaiClientConfiguration aaiClientCloudConfiguration; @@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig { } private void parsingConfigError(Throwable throwable) { - logger.warn("Error in case of processing system environment, more details below: ", throwable); + LOGGER.warn("Error in case of processing system environment, more details below: ", throwable); } private void cloudConfigError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); } private void parsingConfigSuccess(EnvProperties envProperties) { - logger.info("Fetching PRH configuration from ConfigBindingService/Consul"); + LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul"); prhConfigurationProvider.callForPrhConfiguration(envProperties) .subscribe(this::parseCloudConfig, this::cloudConfigError); } private void parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); + LOGGER.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java index d3b6cbb3..fdf6847b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java @@ -20,14 +20,15 @@ package org.onap.dcaegen2.services.prh.configuration; -import java.util.Optional; -import java.util.Properties; import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.prh.model.EnvProperties; import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.Properties; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 @@ -35,23 +36,23 @@ import reactor.core.publisher.Flux; class EnvironmentProcessor { private static final int DEFAULT_CONSUL_PORT = 8500; - private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class); private EnvironmentProcessor() { } - static Flux<EnvProperties> evaluate(Properties systemEnvironment) { - logger.info("Loading configuration from system environment variables {}", systemEnvironment); + static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + LOGGER.info("Loading configuration from system environment variables"); EnvProperties envProperties; try { envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) .appName(getService(systemEnvironment)).build(); } catch (EnvironmentLoaderException e) { - return Flux.error(e); + return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); - return Flux.just(envProperties); + LOGGER.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); } private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { @@ -78,8 +79,8 @@ class EnvironmentProcessor { } private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT environment has not been defined"); - logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + LOGGER.warn("$CONSUL_PORT environment has not been defined"); + LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); return DEFAULT_CONSUL_PORT; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 2fb61c06..92574417 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config { private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); AaiClientConfiguration aaiClientConfiguration; @@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config { DmaapPublisherConfiguration.class); } } catch (IOException e) { - logger.warn("Problem with file loading, file: {}", filepath, e); + LOGGER.warn("Problem with file loading, file: {}", filepath, e); } catch (JsonSyntaxException e) { - logger.warn("Problem with Json deserialization", e); + LOGGER.warn("Problem with Json deserialization", e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java index 6132a674..214d6db6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java @@ -21,28 +21,26 @@ package org.onap.dcaegen2.services.prh.configuration; import io.swagger.annotations.ApiOperation; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import javax.annotation.PostConstruct; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Mono; +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 */ @@ -50,24 +48,22 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig { - private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5; + private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker ENTRY = MarkerFactory.getMarker("ENRTY"); - private final ConcurrentTaskScheduler taskScheduler; + private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final TaskScheduler cloudTaskScheduler; private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler, - ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler, - CloudConfiguration cloudConfiguration) { - this.taskScheduler = concurrentTaskScheduler; + public SchedulerConfig(TaskScheduler taskScheduler, + ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { + this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.cloudTaskScheduler = cloudTaskScheduler; this.cloudConfiguration = cloudConfiguration; } @@ -94,9 +90,9 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { - logger.info(ENTRY,"Start scheduling PRH workflow"); + LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { - scheduledPrhTaskFutureList.add(cloudTaskScheduler + scheduledPrhTaskFutureList.add(taskScheduler .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledPrhTaskFutureList.add(taskScheduler diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java index 573724d8..1b2f4a11 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "HeartbeatController", description = "Check liveness of PRH service") public class HeartbeatController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class); /** * Endpoint for checking that PRH is alive. @@ -57,7 +57,7 @@ public class HeartbeatController { } ) public Mono<ResponseEntity<String>> heartbeat() { - logger.trace("Receiving heartbeat request"); + LOGGER.trace("Receiving heartbeat request"); return Mono.defer(() -> Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) ); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index 270fa584..9386b9e8 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class); private final SchedulerConfig schedulerConfig; @@ -52,14 +52,14 @@ public class ScheduleController { @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") public Mono<ResponseEntity<String>> startTasks() { - logger.trace("Receiving start scheduling worker request"); + LOGGER.trace("Receiving start scheduling worker request"); return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); } @RequestMapping(value = "stopPrh", method = RequestMethod.GET) @ApiOperation(value = "Receiving stop scheduling worker request") public Mono<ResponseEntity<String>> stopTask() { - logger.trace("Receiving stop scheduling worker request"); + LOGGER.trace("Receiving stop scheduling worker request"); return schedulerConfig.getResponseFromCancellationOfTasks(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 8742d872..a5ecc1dd 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ @@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser { private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; private static final String SOURCE_NAME = "sourceName"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. * @@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser { */ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message)) .flatMap(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromCallable(() -> new JsonParser().parse(message)); } private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) : getConsumerDmaapModelFromJsonArray(jsonElement); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java index 56ab484b..4f66e25c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java @@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; -public class HttpGetClient { +class HttpGetClient { - private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class); private final WebClient webClient; private final Gson gson; @@ -41,12 +41,12 @@ public class HttpGetClient { this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); } - HttpGetClient(WebClient webClient){ + HttpGetClient(WebClient webClient) { this.webClient = webClient; this.gson = new Gson(); } - public <T> Mono<T> callHttpGet(String url, Class<T> tClass) { + <T> Mono<T> callHttpGet(String url, Class<T> tClass) { return webClient .get() .uri(url) @@ -54,7 +54,7 @@ public class HttpGetClient { .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) .bodyToMono(String.class) - .flatMap(body->getJsonFromRequest(body,tClass)); + .flatMap(body -> getJsonFromRequest(body, tClass)); } private RuntimeException getException(ClientResponse response) { @@ -66,27 +66,26 @@ public class HttpGetClient { try { return Mono.just(parseJson(body, tClass)); } catch (JsonSyntaxException | IllegalStateException e) { - logger.warn("Converting string to json threw error ", e); return Mono.error(e); } } - private <T> T parseJson(String body, Class<T> tClass){ + private <T> T parseJson(String body, Class<T> tClass) { return gson.fromJson(body, tClass); } private static ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { - logger.info("Response status {}", clientResponse.statusCode()); + LOGGER.info("Response status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } private static ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value))); return Mono.just(clientRequest); }); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java index 7af4a7c8..b346bf5e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java @@ -38,7 +38,7 @@ import java.net.URISyntaxException; @Service public class PrhConfigurationProvider { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class); private final HttpGetClient httpGetClient; @@ -56,12 +56,12 @@ public class PrhConfigurationProvider { } private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { - logger.info("Retrieving Config Binding Service endpoint from Consul"); + LOGGER.info("Retrieving Config Binding Service endpoint from Consul"); try { return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) .flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName())); } catch (URISyntaxException e) { - logger.warn("Malformed Consul uri", e); + LOGGER.warn("Malformed Consul uri", e); return Mono.error(e); } } @@ -72,7 +72,7 @@ public class PrhConfigurationProvider { } private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) { - logger.info("Retrieving PRH configuration"); + LOGGER.info("Retrieving PRH configuration"); return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); } @@ -86,7 +86,7 @@ public class PrhConfigurationProvider { return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); } catch (URISyntaxException e) { - logger.warn("Malformed Config Binding Service uri", e); + LOGGER.warn("Malformed Config Binding Service uri", e); return Mono.error(e); } } @@ -99,7 +99,7 @@ public class PrhConfigurationProvider { throw new IllegalStateException("JSON Array was empty"); } } catch (IllegalStateException e) { - logger.warn("Failed to retrieve JSON Object from array", e); + LOGGER.warn("Failed to retrieve JSON Object from array", e); return Mono.error(e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index f58fed61..5a05d374 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ public abstract class AaiProducerTask { - abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; + abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiProducerReactiveHttpClient resolveClient(); + abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException; protected abstract AaiClientConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) - throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException; - WebClient buildWebClient() { + WebClient buildWebClient() throws SSLException { return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index f5b8307b..7ccf75a6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -43,9 +45,8 @@ import reactor.core.publisher.Mono; public class AaiProducerTaskImpl extends AaiProducerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; @@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends } @Override - Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - + Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { + LOGGER.info("Publish to AAI DmaapModel"); return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response)) { - return consumerDmaapModel; + if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { + return Mono.just(consumerDmaapModel); } return Mono .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); @@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends } @Override - AaiProducerReactiveHttpClient resolveClient() { - return new AaiProducerReactiveHttpClient(resolveConfiguration()); + AaiProducerReactiveHttpClient resolveClient() throws SSLException { + return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient()); } @Override @@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends } @Override - protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } aaiProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a912ca9e..d322a43e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -21,7 +21,6 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; @@ -33,7 +32,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException; + abstract Mono<ConsumerDmaapModel> consume(Mono<String> message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -41,7 +40,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 9e1fadf1..0d4be08e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,17 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,11 +40,11 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(Config config) { @@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override public Mono<ConsumerDmaapModel> execute(String object) { - dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", object); + DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + LOGGER.debug(INVOKE, "Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 9a5813d1..7a121d5f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -33,15 +33,14 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; abstract DMaaPProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException; - WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } + abstract RestTemplate buildWebClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 73260381..733b8651 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -39,8 +41,8 @@ import reactor.core.publisher.Mono; @Component public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final Config config; private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - return consumerDmaapModel.flatMap(dmaapModel -> { - logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), - dmaapModel); - return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel); - }); + Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) { + return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } dmaapProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } @Override + RestTemplate buildWebClient() { + return new RestTemplate(); + } + + @Override protected DmaapPublisherConfiguration resolveConfiguration() { return config.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6432a338..f74bc56a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; @@ -72,24 +72,33 @@ public class ScheduledTasks { */ public void scheduleMainPrhEventTask() { MDCVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); + try { + logger.trace("Execution of tasks was registered"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromDMaaPMessage() + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + mainCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private void onComplete() { logger.info("PRH tasks have been completed"); } - private void onSuccess(String responseCode) { - MDC.put(RESPONSE_CODE, responseCode); - logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(ResponseEntity<String> responseCode) { + MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + responseCode.getStatusCode().value()); } private void onError(Throwable throwable) { @@ -98,24 +107,26 @@ public class ScheduledTasks { } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { - return () -> { + + private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Mono.defer(() -> { MDCVariables.setMdcContextMap(contextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); + logger.info("Init configs"); dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }; + }); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { try { return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); } catch (PrhTaskException e) { diff --git a/prh-app-server/src/main/resources/application.properties b/prh-app-server/src/main/resources/application.properties index ac0192ca..e343a360 100644 --- a/prh-app-server/src/main/resources/application.properties +++ b/prh-app-server/src/main/resources/application.properties @@ -9,6 +9,9 @@ logging.level.root=ERROR logging.level.org.springframework=ERROR logging.level.org.springframework.data=ERROR logging.level.org.onap.dcaegen2.services.prh=INFO +logging.level.org.springframework.web.reactive=WARN +logging.level.reactor.ipc.netty.http.client=WARN app.filepath=config/prh_endpoints.json app.xonaprequestid=requestID app.xinvocationid=invocationID + diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java index 54259397..f5cc6b24 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java @@ -29,8 +29,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import javax.net.ssl.SSLException; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; @@ -40,6 +41,8 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -56,14 +59,16 @@ class AaiProducerTaskImplTest { private static final String BASE_PATH = "/aai/v11"; private static final String PNF_PATH = "/network/pnfs/pnf"; - private static ConsumerDmaapModel consumerDmaapModel; - private static AaiProducerTaskImpl aaiProducerTask; - private static AaiClientConfiguration aaiClientConfiguration; - private static AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; - private static AppConfig appConfig; + private ConsumerDmaapModel consumerDmaapModel; + private AaiProducerTaskImpl aaiProducerTask; + private AaiClientConfiguration aaiClientConfiguration; + private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; + private AppConfig appConfig; + private ClientResponse clientResponse; - @BeforeAll - static void setUp() { + @BeforeEach + void setUp() { + clientResponse = mock(ClientResponse.class); aaiClientConfiguration = new ImmutableAaiClientConfiguration.Builder() .aaiHost(AAI_HOST) .aaiPort(PORT) @@ -81,17 +86,6 @@ class AaiProducerTaskImplTest { } - private static void getAaiProducerTask_whenMockingResponseObject(Integer statusCode) { - //given - aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class); - when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any())) - .thenReturn(Mono.just(statusCode)); - when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration); - aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); - when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration); - doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient(); - } - @Test void whenPassedObjectDoesntFit_ThrowsPrhTaskException() { //given/when/ @@ -105,10 +99,10 @@ class AaiProducerTaskImplTest { } @Test - void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException { + void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException { //given/when getAaiProducerTask_whenMockingResponseObject(200); - Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(Mono.just(consumerDmaapModel)); + Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(consumerDmaapModel); //then verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any()); @@ -118,13 +112,26 @@ class AaiProducerTaskImplTest { } @Test - void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException { + void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException, SSLException { //given/when getAaiProducerTask_whenMockingResponseObject(400); - StepVerifier.create(aaiProducerTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() + StepVerifier.create(aaiProducerTask.execute(consumerDmaapModel)).expectSubscription() .expectError(PrhTaskException.class).verify(); //then verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any()); verifyNoMoreInteractions(aaiProducerReactiveHttpClient); } + + private void getAaiProducerTask_whenMockingResponseObject(int statusCode) throws SSLException { + //given + doReturn(HttpStatus.valueOf(statusCode)).when(clientResponse).statusCode(); + Mono<ClientResponse> clientResponseMono = Mono.just(clientResponse); + aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class); + when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any())) + .thenReturn(clientResponseMono); + when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration); + aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); + when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration); + doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient(); + } }
\ No newline at end of file diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java index 82dcdae9..231bf144 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java @@ -20,10 +20,6 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient; @@ -31,6 +27,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import javax.net.ssl.SSLException; + +import static org.mockito.Mockito.*; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -44,7 +44,7 @@ public class AaiPublisherTaskSpy { */ @Bean @Primary - public AaiProducerTask registerSimpleAaiPublisherTask() { + public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException { AppConfig appConfig = spy(AppConfig.class); doReturn(mock(AaiClientConfiguration.class)).when(appConfig).getAaiClientConfiguration(); AaiProducerTaskImpl aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig)); diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java index 453679df..ae7b8e77 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java @@ -20,16 +20,6 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -42,9 +32,14 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 */ @@ -84,15 +79,16 @@ class DmaapPublisherTaskImplTest { @Test void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException { //given - prepareMocksForTests(HttpStatus.OK.value()); + ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.OK.value()); //when - StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() - .expectNext(HttpStatus.OK.toString()).verifyComplete(); + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() + .expectNext(responseEntity).verifyComplete(); //then verify(dMaaPProducerReactiveHttpClient, times(1)) - .getDMaaPProducerResponse(any()); + .getDMaaPProducerResponse(consumerDmaapModel); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @@ -100,24 +96,30 @@ class DmaapPublisherTaskImplTest { @Test void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException { //given - prepareMocksForTests(HttpStatus.UNAUTHORIZED.value()); + ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.UNAUTHORIZED.value()); //when - StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription() - .expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete(); + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() + .expectNext(responseEntity).verifyComplete(); //then - verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any()); + verify(dMaaPProducerReactiveHttpClient, times(1)) + .getDMaaPProducerResponse(consumerDmaapModel); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } - private void prepareMocksForTests(Integer httpResponseCode) { + private ResponseEntity<String> prepareMocksForTests(Integer httpResponseCode) { + ResponseEntity<String> responseEntity = mock(ResponseEntity.class); + //when + when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode)); dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any())) - .thenReturn(Mono.just(httpResponseCode.toString())); + .thenReturn(Mono.just(responseEntity)); dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); + return responseEntity; } }
\ No newline at end of file diff --git a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java index 145d9176..83a078df 100644 --- a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java +++ b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java @@ -42,4 +42,4 @@ public class CommonFunctions { return gsonBuilder.create().toJson(ImmutableConsumerDmaapModel.builder().ipv4(consumerDmaapModel.getIpv4()) .ipv6(consumerDmaapModel.getIpv6()).sourceName(consumerDmaapModel.getSourceName()).build()); } -} +}
\ No newline at end of file diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java index 8ce81757..4327dfbf 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java @@ -20,13 +20,10 @@ package org.onap.dcaegen2.services.prh.service; -import java.util.HashMap; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient { private String dmaaPUserName; private String dmaaPUserPassword; - private String dmaaPContentType; /** * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. @@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient { public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); - this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); - return this; } @@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient { */ public WebClient build() { return WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) .filter(logRequest()) .filter(logResponse()) .build(); diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java index ac13dd61..f9a66378 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java @@ -29,9 +29,8 @@ import java.net.URISyntaxException; import java.util.UUID; import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -41,13 +40,13 @@ import reactor.core.publisher.Mono; */ public class DMaaPConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final String dmaapHostName; private final String dmaapProtocol; private final Integer dmaapPortNumber; private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; private WebClient webClient; /** @@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient { .uri(getUri()) .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) .header(X_INVOCATION_ID, UUID.randomUUID().toString()) + .header(HttpHeaders.CONTENT_TYPE, contentType) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())) ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))) + Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))) .bodyToMono(String.class); } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI "); return Mono.error(e); } } diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java index d049d380..5c72b38c 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.service.producer; +import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID; import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID; @@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -44,11 +47,13 @@ import reactor.core.publisher.Mono; public class DMaaPProducerReactiveHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final String dmaapHostName; private final Integer dmaapPortNumber; private final String dmaapProtocol; private final String dmaapTopicName; - private WebClient webClient; + private final String dmaapContentType; + private RestTemplate restTemplate; /** * Constructor DMaaPProducerReactiveHttpClient. @@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient { this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); } /** @@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient { * @param consumerDmaapModelMono - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { - try { - return webClient - .post() - .uri(getUri()) - .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)) - .header(X_INVOCATION_ID, UUID.randomUUID().toString()) - .body(BodyInserters.fromObject(consumerDmaapModelMono)) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode())) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode()))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI"); - return Mono.error(e); - } + + public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) { + return Mono.defer(() -> { + try { + HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders()); + return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class)); + } catch (URISyntaxException e) { + logger.warn("Exception while evaluating URI"); + return Mono.error(e); + } + }); + } + + private HttpHeaders getAllHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID)); + headers.set(X_INVOCATION_ID, UUID.randomUUID().toString()); + headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType); + return headers; + } - public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; + public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) { + this.restTemplate = restTemplate; return this; } diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java index 1a237562..9f693701 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java @@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest { .expectError(Exception.class).verify(); } + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12")); + } + private void mockDependantObjects() { when(webClient.get()).thenReturn(requestHeadersSpec); when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec); diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java index e8af8cd9..05b74895 100644 --- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java +++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java @@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; -import reactor.core.publisher.Mono; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 */ + class DMaaPProducerReactiveHttpClientTest { private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest { private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock( DmaapPublisherConfiguration.class); private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest(); - private WebClient webClient = mock(WebClient.class); - private RequestBodyUriSpec requestBodyUriSpec; - private ResponseSpec responseSpec; @BeforeEach @@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest { when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH"); when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json"); - when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady"); - + when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY"); dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock); - webClient = spy(WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType()) - .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(), - dmaapPublisherConfigurationMock.dmaapUserPassword())) - .build()); - requestBodyUriSpec = mock(RequestBodyUriSpec.class); - responseSpec = mock(ResponseSpec.class); } @Test void getHttpResponse_Success() { //given - Integer responseSuccess = 200; - Mono<Integer> expectedResult = Mono.just(responseSuccess); - + int responseSuccess = 200; + ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class); + RestTemplate restTemplate = mock(RestTemplate.class); //when - mockWebClientDependantObject(); - doReturn(expectedResult).when(responseSpec).bodyToMono(String.class); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); - Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess)); + doReturn(mockedResponseEntity).when(restTemplate) + .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any()); + dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate); //then - Assertions.assertEquals(response.block(), expectedResult.block()); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel)) + .expectSubscription().expectNext(mockedResponseEntity).verifyComplete(); } @Test @@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest { //given dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient); //when - when(webClient.post()).thenReturn(requestBodyUriSpec); - dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient); when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class); //then @@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest { .expectError(Exception.class).verify(); } - private void mockWebClientDependantObject() { - RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class); - when(webClient.post()).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec); - when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec); - doReturn(responseSpec).when(requestHeadersSpec).retrieve(); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + @Test + void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException { + Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY")); } }
\ No newline at end of file |