summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java28
-rw-r--r--prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java58
-rw-r--r--prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java66
-rw-r--r--prh-app-server/config/application.yaml4
-rw-r--r--prh-app-server/config/prh_endpoints.json2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java38
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java14
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java24
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
-rw-r--r--prh-app-server/src/main/resources/application.properties3
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java53
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java10
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java42
-rw-r--r--prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java2
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java7
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java12
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java57
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java7
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java58
33 files changed, 360 insertions, 345 deletions
diff --git a/pom.xml b/pom.xml
index 06b03870..82cdab74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,7 +134,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
- <version>Bismuth-RELEASE</version>
+ <version>Bismuth-SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
index 6daf54a1..55dcb398 100644
--- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
+++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
@@ -24,11 +24,18 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.SERVICE_NAME;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
import java.util.Map;
+import javax.net.ssl.SSLException;
+
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -36,7 +43,7 @@ import reactor.core.publisher.Mono;
public class AaiReactiveWebClient {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(AaiReactiveWebClient.class);
private String aaiUserName;
private String aaiUserPassword;
@@ -60,8 +67,19 @@ public class AaiReactiveWebClient {
*
* @return WebClient
*/
- public WebClient build() {
+ public WebClient build() throws SSLException {
+ SslContext sslContext;
+ sslContext = SslContextBuilder
+ .forClient()
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ LOGGER.debug("Setting ssl context");
+
return WebClient.builder()
+ .clientConnector(new ReactorClientHttpConnector(clientOptions -> {
+ clientOptions.sslContext(sslContext);
+ clientOptions.disablePool();
+ }))
.defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
.filter(basicAuthentication(aaiUserName, aaiUserPassword))
.filter(logRequest())
@@ -72,9 +90,9 @@ public class AaiReactiveWebClient {
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
- logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
@@ -82,7 +100,7 @@ public class AaiReactiveWebClient {
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
- logger.info("Response Status {}", clientResponse.statusCode());
+ LOGGER.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
index be6c63e0..358a4524 100644
--- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
+++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
@@ -20,35 +20,32 @@
package org.onap.dcaegen2.services.prh.service.producer;
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
+import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.*;
+
public class AaiProducerReactiveHttpClient {
+ private WebClient webClient;
private final String aaiHost;
private final String aaiProtocol;
private final Integer aaiHostPortNumber;
private final String aaiBasePath;
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private WebClient webClient;
-
+ private final String aaiPnfPath;
/**
* Constructor of AaiProducerReactiveHttpClient.
@@ -60,6 +57,7 @@ public class AaiProducerReactiveHttpClient {
this.aaiProtocol = configuration.aaiProtocol();
this.aaiHostPortNumber = configuration.aaiPort();
this.aaiBasePath = configuration.aaiBasePath();
+ this.aaiPnfPath = configuration.aaiPnfPath();
}
/**
@@ -68,10 +66,12 @@ public class AaiProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to AAI database
* @return status code of operation
*/
- public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
- return consumerDmaapModelMono
- .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel))
- .flatMap(this::patchAaiRequest);
+ public Mono<ClientResponse> getAaiProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ try {
+ return patchAaiRequest(consumerDmaapModelMono);
+ } catch (URISyntaxException e) {
+ return Mono.error(e);
+ }
}
public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) {
@@ -79,26 +79,14 @@ public class AaiProducerReactiveHttpClient {
return this;
}
- private Mono<Integer> patchAaiRequest(ConsumerDmaapModel dmaapModel) {
- try {
- return webClient.patch()
+ private Mono<ClientResponse> patchAaiRequest(ConsumerDmaapModel dmaapModel) throws URISyntaxException {
+ return
+ webClient.patch()
.uri(getUri(dmaapModel.getSourceName()))
.header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
.header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .body(BodyInserters.fromObject(dmaapModel))
- .retrieve()
- .onStatus(
- HttpStatus::is4xxClientError,
- clientResponse -> Mono
- .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError,
- clientResponse -> Mono
- .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())))
- .bodyToMono(Integer.class);
- } catch (URISyntaxException e) {
- return Mono.error(e);
- }
+ .body(Mono.just(createJsonBody(dmaapModel)), String.class)
+ .exchange();
}
URI getUri(String pnfName) throws URISyntaxException {
@@ -106,7 +94,7 @@ public class AaiProducerReactiveHttpClient {
.setScheme(aaiProtocol)
.setHost(aaiHost)
.setPort(aaiHostPortNumber)
- .setPath(aaiBasePath + "/" + pnfName)
+ .setPath(aaiBasePath + aaiPnfPath + "/" + pnfName)
.build();
}
}
diff --git a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java
index 9b0f4fe8..4160f356 100644
--- a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java
+++ b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java
@@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.service.producer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -31,18 +32,19 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
+import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-
class AaiProducerReactiveHttpClientTest {
private static final Integer SUCCESS_RESPONSE = 200;
@@ -50,16 +52,21 @@ class AaiProducerReactiveHttpClientTest {
private static AaiClientConfiguration aaiConfigurationMock = mock(AaiClientConfiguration.class);
private static WebClient webClient = mock(WebClient.class);
- private static ConsumerDmaapModel dmaapModel = new ConsumerDmaapModelForUnitTest();
- private static WebClient.RequestBodyUriSpec requestBodyUriSpec;
- private static ResponseSpec responseSpec;
- private static Map<String, String> aaiHeaders;
+ private ConsumerDmaapModel dmaapModel = spy(new ConsumerDmaapModelForUnitTest());
+ private WebClient.RequestBodyUriSpec requestBodyUriSpec;
+ private ResponseSpec responseSpec;
- @BeforeAll
- static void setUp() {
- setupHeaders();
+ private Map<String, String> aaiHeaders;
+ private ClientResponse clientResponse;
+ private Mono<ClientResponse> clientResponseMono;
+ @BeforeEach
+ void setUp() {
+ setupHeaders();
+ clientResponse = mock(ClientResponse.class);
+ clientResponseMono = Mono.just(clientResponse);
+ when(dmaapModel.getSourceName()).thenReturn("NOKnhfsadhff");
when(aaiConfigurationMock.aaiHost()).thenReturn("54.45.33.2");
when(aaiConfigurationMock.aaiProtocol()).thenReturn("https");
when(aaiConfigurationMock.aaiPort()).thenReturn(1234);
@@ -80,15 +87,6 @@ class AaiProducerReactiveHttpClientTest {
responseSpec = mock(ResponseSpec.class);
}
- private static void setupHeaders() {
- aaiHeaders = new HashMap<>();
- aaiHeaders.put("X-FromAppId", "PRH");
- aaiHeaders.put("X-TransactionId", "vv-temp");
- aaiHeaders.put("Accept", "application/json");
- aaiHeaders.put("Real-Time", "true");
- aaiHeaders.put("Content-Type", "application/merge-patch+json");
- }
-
@Test
void getAaiProducerResponse_shouldReturn200() {
//given
@@ -98,12 +96,11 @@ class AaiProducerReactiveHttpClientTest {
mockWebClientDependantObject();
doReturn(expectedResult).when(responseSpec).bodyToMono(Integer.class);
aaiProducerReactiveHttpClient.createAaiWebClient(webClient);
- Mono<Integer> response = aaiProducerReactiveHttpClient.getAaiProducerResponse(Mono.just(dmaapModel));
//then
- StepVerifier.create(response).expectSubscription()
+ StepVerifier.create(aaiProducerReactiveHttpClient.getAaiProducerResponse(dmaapModel)).expectSubscription()
.expectNextMatches(results -> {
- Assertions.assertEquals(results, expectedResult.block());
+ Assertions.assertEquals(results, clientResponse);
return true;
}).verifyComplete();
}
@@ -115,24 +112,37 @@ class AaiProducerReactiveHttpClientTest {
//when
when(webClient.patch()).thenReturn(requestBodyUriSpec);
aaiProducerReactiveHttpClient.createAaiWebClient(webClient);
- when(aaiProducerReactiveHttpClient.getUri("pnfName")).thenThrow(URISyntaxException.class);
-
+ doThrow(URISyntaxException.class).when(aaiProducerReactiveHttpClient).getUri(any());
//then
StepVerifier.create(
aaiProducerReactiveHttpClient.getAaiProducerResponse(
- Mono.just(dmaapModel)
+ dmaapModel
)).expectSubscription().expectError(Exception.class).verify();
}
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(aaiProducerReactiveHttpClient.getUri("NOKnhfsadhff"),
+ URI.create("https://54.45.33.2:1234/aai/v11/network/pnfs/pnf/NOKnhfsadhff"));
+ }
+
+
+ private void setupHeaders() {
+ aaiHeaders = new HashMap<>();
+ aaiHeaders.put("X-FromAppId", "PRH");
+ aaiHeaders.put("X-TransactionId", "vv-temp");
+ aaiHeaders.put("Accept", "application/json");
+ aaiHeaders.put("Real-Time", "true");
+ aaiHeaders.put("Content-Type", "application/merge-patch+json");
+ }
+
private void mockWebClientDependantObject() {
WebClient.RequestHeadersSpec requestHeadersSpec = mock(WebClient.RequestHeadersSpec.class);
when(webClient.patch()).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
- doReturn(responseSpec).when(requestHeadersSpec).retrieve();
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ when(requestBodyUriSpec.body(any(), (Class<Object>) any())).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.exchange()).thenReturn(clientResponseMono);
}
-
}
diff --git a/prh-app-server/config/application.yaml b/prh-app-server/config/application.yaml
index 390ea9d2..2e6f54df 100644
--- a/prh-app-server/config/application.yaml
+++ b/prh-app-server/config/application.yaml
@@ -12,8 +12,10 @@ server:
logging:
level:
ROOT: ERROR
+ org.onap.dcaegen2.services.prh: INFO
+ reactor.ipc.netty.http.client: WARN
org.springframework: ERROR
org.springframework.data: ERROR
- org.onap.dcaegen2.services.prh: INFO
+ org.springframework.web.reactive: WARN
app:
filepath: config/prh_endpoints.json \ No newline at end of file
diff --git a/prh-app-server/config/prh_endpoints.json b/prh-app-server/config/prh_endpoints.json
index e5d1c7b8..b3bff7d9 100644
--- a/prh-app-server/config/prh_endpoints.json
+++ b/prh-app-server/config/prh_endpoints.json
@@ -35,6 +35,8 @@
"aaiBasePath": "/aai/v12",
"aaiPnfPath": "/network/pnfs/pnf",
"aaiHeaders": {
+ "X-FromAppId": "prh",
+ "X-TransactionId": "9999",
"Accept": "application/json",
"Real-Time": "true",
"Content-Type": "application/merge-patch+json"
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index fc485e15..96d47e34 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -21,15 +21,13 @@
package org.onap.dcaegen2.services.prh;
import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -44,17 +42,7 @@ public class MainApp {
}
@Bean
- ConcurrentTaskScheduler concurrentTaskScheduler() {
+ TaskScheduler concurrentTaskScheduler() {
return new ConcurrentTaskScheduler();
}
-
- @Bean
- ThreadPoolTaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler threadPoolTaskScheduler
- = new ThreadPoolTaskScheduler();
- threadPoolTaskScheduler.setPoolSize(5);
- threadPoolTaskScheduler.setThreadNamePrefix(
- "CloudThreadPoolTaskScheduler");
- return threadPoolTaskScheduler;
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index bc4bbf80..11c75e80 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers;
@Primary
public class CloudConfiguration extends AppConfig {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class);
private PrhConfigurationProvider prhConfigurationProvider;
private AaiClientConfiguration aaiClientCloudConfiguration;
@@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig {
}
private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ LOGGER.warn("Error in case of processing system environment, more details below: ", throwable);
}
private void cloudConfigError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+ LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
+ LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul");
prhConfigurationProvider.callForPrhConfiguration(envProperties)
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
private void parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
+ LOGGER.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index d3b6cbb3..fdf6847b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -20,14 +20,15 @@
package org.onap.dcaegen2.services.prh.configuration;
-import java.util.Optional;
-import java.util.Properties;
import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.Properties;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -35,23 +36,23 @@ import reactor.core.publisher.Flux;
class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
- private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class);
private EnvironmentProcessor() {
}
- static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
- logger.info("Loading configuration from system environment variables {}", systemEnvironment);
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ LOGGER.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
.consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
.appName(getService(systemEnvironment)).build();
} catch (EnvironmentLoaderException e) {
- return Flux.error(e);
+ return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
- return Flux.just(envProperties);
+ LOGGER.info("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
}
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
@@ -78,8 +79,8 @@ class EnvironmentProcessor {
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
- logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ LOGGER.warn("$CONSUL_PORT environment has not been defined");
+ LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 2fb61c06..92574417 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config {
private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
AaiClientConfiguration aaiClientConfiguration;
@@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config {
DmaapPublisherConfiguration.class);
}
} catch (IOException e) {
- logger.warn("Problem with file loading, file: {}", filepath, e);
+ LOGGER.warn("Problem with file loading, file: {}", filepath, e);
} catch (JsonSyntaxException e) {
- logger.warn("Problem with Json deserialization", e);
+ LOGGER.warn("Problem with Json deserialization", e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 6132a674..214d6db6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -21,28 +21,26 @@
package org.onap.dcaegen2.services.prh.configuration;
import io.swagger.annotations.ApiOperation;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import javax.annotation.PostConstruct;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Mono;
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
*/
@@ -50,24 +48,22 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker ENTRY = MarkerFactory.getMarker("ENRTY");
- private final ConcurrentTaskScheduler taskScheduler;
+ private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final TaskScheduler cloudTaskScheduler;
private final CloudConfiguration cloudConfiguration;
@Autowired
- public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler,
- ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler,
- CloudConfiguration cloudConfiguration) {
- this.taskScheduler = concurrentTaskScheduler;
+ public SchedulerConfig(TaskScheduler taskScheduler,
+ ScheduledTasks scheduledTask,
+ CloudConfiguration cloudConfiguration) {
+ this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
- this.cloudTaskScheduler = cloudTaskScheduler;
this.cloudConfiguration = cloudConfiguration;
}
@@ -94,9 +90,9 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- logger.info(ENTRY,"Start scheduling PRH workflow");
+ LOGGER.info(ENTRY, "Start scheduling PRH workflow");
if (scheduledPrhTaskFutureList.isEmpty()) {
- scheduledPrhTaskFutureList.add(cloudTaskScheduler
+ scheduledPrhTaskFutureList.add(taskScheduler
.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
index 573724d8..1b2f4a11 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "HeartbeatController", description = "Check liveness of PRH service")
public class HeartbeatController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
/**
* Endpoint for checking that PRH is alive.
@@ -57,7 +57,7 @@ public class HeartbeatController {
}
)
public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
+ LOGGER.trace("Receiving heartbeat request");
return Mono.defer(() ->
Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index 270fa584..9386b9e8 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "ScheduleController", description = "Schedule Controller")
public class ScheduleController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class);
private final SchedulerConfig schedulerConfig;
@@ -52,14 +52,14 @@ public class ScheduleController {
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Receiving start scheduling worker request");
+ LOGGER.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
@RequestMapping(value = "stopPrh", method = RequestMethod.GET)
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
+ LOGGER.trace("Receiving stop scheduling worker request");
return schedulerConfig.getResponseFromCancellationOfTasks();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 8742d872..a5ecc1dd 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser {
private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
private static final String SOURCE_NAME = "sourceName";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
@@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser {
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromCallable(() -> new JsonParser().parse(message));
}
private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
index 56ab484b..4f66e25c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
@@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
-public class HttpGetClient {
+class HttpGetClient {
- private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class);
private final WebClient webClient;
private final Gson gson;
@@ -41,12 +41,12 @@ public class HttpGetClient {
this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
}
- HttpGetClient(WebClient webClient){
+ HttpGetClient(WebClient webClient) {
this.webClient = webClient;
this.gson = new Gson();
}
- public <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
+ <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
return webClient
.get()
.uri(url)
@@ -54,7 +54,7 @@ public class HttpGetClient {
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
.bodyToMono(String.class)
- .flatMap(body->getJsonFromRequest(body,tClass));
+ .flatMap(body -> getJsonFromRequest(body, tClass));
}
private RuntimeException getException(ClientResponse response) {
@@ -66,27 +66,26 @@ public class HttpGetClient {
try {
return Mono.just(parseJson(body, tClass));
} catch (JsonSyntaxException | IllegalStateException e) {
- logger.warn("Converting string to json threw error ", e);
return Mono.error(e);
}
}
- private <T> T parseJson(String body, Class<T> tClass){
+ private <T> T parseJson(String body, Class<T> tClass) {
return gson.fromJson(body, tClass);
}
private static ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
- logger.info("Response status {}", clientResponse.statusCode());
+ LOGGER.info("Response status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
private static ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
- logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
index 7af4a7c8..b346bf5e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
@@ -38,7 +38,7 @@ import java.net.URISyntaxException;
@Service
public class PrhConfigurationProvider {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class);
private final HttpGetClient httpGetClient;
@@ -56,12 +56,12 @@ public class PrhConfigurationProvider {
}
private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
- logger.info("Retrieving Config Binding Service endpoint from Consul");
+ LOGGER.info("Retrieving Config Binding Service endpoint from Consul");
try {
return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
.flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName()));
} catch (URISyntaxException e) {
- logger.warn("Malformed Consul uri", e);
+ LOGGER.warn("Malformed Consul uri", e);
return Mono.error(e);
}
}
@@ -72,7 +72,7 @@ public class PrhConfigurationProvider {
}
private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) {
- logger.info("Retrieving PRH configuration");
+ LOGGER.info("Retrieving PRH configuration");
return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
}
@@ -86,7 +86,7 @@ public class PrhConfigurationProvider {
return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
} catch (URISyntaxException e) {
- logger.warn("Malformed Config Binding Service uri", e);
+ LOGGER.warn("Malformed Config Binding Service uri", e);
return Mono.error(e);
}
}
@@ -99,7 +99,7 @@ public class PrhConfigurationProvider {
throw new IllegalStateException("JSON Array was empty");
}
} catch (IllegalStateException e) {
- logger.warn("Failed to retrieve JSON Object from array", e);
+ LOGGER.warn("Failed to retrieve JSON Object from array", e);
return Mono.error(e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index f58fed61..5a05d374 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
public abstract class AaiProducerTask {
- abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException;
+ abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
- abstract AaiProducerReactiveHttpClient resolveClient();
+ abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException;
protected abstract AaiClientConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel)
- throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException;
- WebClient buildWebClient() {
+ WebClient buildWebClient() throws SSLException {
return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index f5b8307b..7ccf75a6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@@ -43,9 +45,8 @@ import reactor.core.publisher.Mono;
public class AaiProducerTaskImpl extends
AaiProducerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
-
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
private final Config config;
private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends
}
@Override
- Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
-
+ Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
+ LOGGER.info("Publish to AAI DmaapModel");
return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
.flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response)) {
- return consumerDmaapModel;
+ if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
+ return Mono.just(consumerDmaapModel);
}
return Mono
.error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
@@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends
}
@Override
- AaiProducerReactiveHttpClient resolveClient() {
- return new AaiProducerReactiveHttpClient(resolveConfiguration());
+ AaiProducerReactiveHttpClient resolveClient() throws SSLException {
+ return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient());
}
@Override
@@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends
}
@Override
- protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException {
+ protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
aaiProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index a912ca9e..d322a43e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -21,7 +21,6 @@
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
@@ -33,7 +32,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
abstract DMaaPConsumerReactiveHttpClient resolveClient();
@@ -41,7 +40,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 9e1fadf1..0d4be08e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,17 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,11 +40,11 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(Config config) {
@@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
public Mono<ConsumerDmaapModel> execute(String object) {
- dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", object);
+ DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ LOGGER.debug(INVOKE, "Method called with arg {}", object);
return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 9a5813d1..7a121d5f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -33,15 +33,14 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
abstract DMaaPProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException;
- WebClient buildWebClient() {
- return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
+ abstract RestTemplate buildWebClient();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 73260381..733b8651 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -39,8 +41,8 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private final Config config;
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- return consumerDmaapModel.flatMap(dmaapModel -> {
- logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
- dmaapModel);
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
- });
+ Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
+ return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
dmaapProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
@Override
+ RestTemplate buildWebClient() {
+ return new RestTemplate();
+ }
+
+ @Override
protected DmaapPublisherConfiguration resolveConfiguration() {
return config.getDmaapPublisherConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6432a338..f74bc56a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
private final AaiProducerTask aaiProducerTask;
@@ -72,24 +72,33 @@ public class ScheduledTasks {
*/
public void scheduleMainPrhEventTask() {
MDCVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
-
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .map(this::publishToAaiConfiguration)
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
+ try {
+ logger.trace("Execution of tasks was registered");
+ CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+ consumeFromDMaaPMessage()
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ logger.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::publishToDmaapConfiguration)
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ mainCountDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
private void onComplete() {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(String responseCode) {
- MDC.put(RESPONSE_CODE, responseCode);
- logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(ResponseEntity<String> responseCode) {
+ MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ responseCode.getStatusCode().value());
}
private void onError(Throwable throwable) {
@@ -98,24 +107,26 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
- return () -> {
+
+ private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Mono.defer(() -> {
MDCVariables.setMdcContextMap(contextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
+ logger.info("Init configs");
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- };
+ });
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
try {
return aaiProducerTask.execute(monoDMaaPModel);
- } catch (PrhTaskException e) {
+ } catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
+ private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
} catch (PrhTaskException e) {
diff --git a/prh-app-server/src/main/resources/application.properties b/prh-app-server/src/main/resources/application.properties
index ac0192ca..e343a360 100644
--- a/prh-app-server/src/main/resources/application.properties
+++ b/prh-app-server/src/main/resources/application.properties
@@ -9,6 +9,9 @@ logging.level.root=ERROR
logging.level.org.springframework=ERROR
logging.level.org.springframework.data=ERROR
logging.level.org.onap.dcaegen2.services.prh=INFO
+logging.level.org.springframework.web.reactive=WARN
+logging.level.reactor.ipc.netty.http.client=WARN
app.filepath=config/prh_endpoints.json
app.xonaprequestid=requestID
app.xinvocationid=invocationID
+
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
index 54259397..f5cc6b24 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
@@ -29,8 +29,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import javax.net.ssl.SSLException;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
@@ -40,6 +41,8 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,14 +59,16 @@ class AaiProducerTaskImplTest {
private static final String BASE_PATH = "/aai/v11";
private static final String PNF_PATH = "/network/pnfs/pnf";
- private static ConsumerDmaapModel consumerDmaapModel;
- private static AaiProducerTaskImpl aaiProducerTask;
- private static AaiClientConfiguration aaiClientConfiguration;
- private static AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
- private static AppConfig appConfig;
+ private ConsumerDmaapModel consumerDmaapModel;
+ private AaiProducerTaskImpl aaiProducerTask;
+ private AaiClientConfiguration aaiClientConfiguration;
+ private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
+ private AppConfig appConfig;
+ private ClientResponse clientResponse;
- @BeforeAll
- static void setUp() {
+ @BeforeEach
+ void setUp() {
+ clientResponse = mock(ClientResponse.class);
aaiClientConfiguration = new ImmutableAaiClientConfiguration.Builder()
.aaiHost(AAI_HOST)
.aaiPort(PORT)
@@ -81,17 +86,6 @@ class AaiProducerTaskImplTest {
}
- private static void getAaiProducerTask_whenMockingResponseObject(Integer statusCode) {
- //given
- aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
- when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any()))
- .thenReturn(Mono.just(statusCode));
- when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
- aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
- when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration);
- doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
- }
-
@Test
void whenPassedObjectDoesntFit_ThrowsPrhTaskException() {
//given/when/
@@ -105,10 +99,10 @@ class AaiProducerTaskImplTest {
}
@Test
- void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException {
+ void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException {
//given/when
getAaiProducerTask_whenMockingResponseObject(200);
- Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(Mono.just(consumerDmaapModel));
+ Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(consumerDmaapModel);
//then
verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
@@ -118,13 +112,26 @@ class AaiProducerTaskImplTest {
}
@Test
- void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException {
+ void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException, SSLException {
//given/when
getAaiProducerTask_whenMockingResponseObject(400);
- StepVerifier.create(aaiProducerTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
+ StepVerifier.create(aaiProducerTask.execute(consumerDmaapModel)).expectSubscription()
.expectError(PrhTaskException.class).verify();
//then
verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
verifyNoMoreInteractions(aaiProducerReactiveHttpClient);
}
+
+ private void getAaiProducerTask_whenMockingResponseObject(int statusCode) throws SSLException {
+ //given
+ doReturn(HttpStatus.valueOf(statusCode)).when(clientResponse).statusCode();
+ Mono<ClientResponse> clientResponseMono = Mono.just(clientResponse);
+ aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
+ when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any()))
+ .thenReturn(clientResponseMono);
+ when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
+ aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
+ when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration);
+ doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
+ }
} \ No newline at end of file
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
index 82dcdae9..231bf144 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
@@ -20,10 +20,6 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
@@ -31,6 +27,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
+import javax.net.ssl.SSLException;
+
+import static org.mockito.Mockito.*;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@@ -44,7 +44,7 @@ public class AaiPublisherTaskSpy {
*/
@Bean
@Primary
- public AaiProducerTask registerSimpleAaiPublisherTask() {
+ public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(AaiClientConfiguration.class)).when(appConfig).getAaiClientConfiguration();
AaiProducerTaskImpl aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
index 453679df..ae7b8e77 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
@@ -20,16 +20,6 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
@@ -42,9 +32,14 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
*/
@@ -84,15 +79,16 @@ class DmaapPublisherTaskImplTest {
@Test
void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException {
//given
- prepareMocksForTests(HttpStatus.OK.value());
+ ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.OK.value());
//when
- StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
- .expectNext(HttpStatus.OK.toString()).verifyComplete();
+ when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
+ .expectNext(responseEntity).verifyComplete();
//then
verify(dMaaPProducerReactiveHttpClient, times(1))
- .getDMaaPProducerResponse(any());
+ .getDMaaPProducerResponse(consumerDmaapModel);
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -100,24 +96,30 @@ class DmaapPublisherTaskImplTest {
@Test
void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException {
//given
- prepareMocksForTests(HttpStatus.UNAUTHORIZED.value());
+ ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.UNAUTHORIZED.value());
//when
- StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
- .expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete();
+ when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED);
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
+ .expectNext(responseEntity).verifyComplete();
//then
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(1))
+ .getDMaaPProducerResponse(consumerDmaapModel);
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
- private void prepareMocksForTests(Integer httpResponseCode) {
+ private ResponseEntity<String> prepareMocksForTests(Integer httpResponseCode) {
+ ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
+ //when
+ when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any()))
- .thenReturn(Mono.just(httpResponseCode.toString()));
+ .thenReturn(Mono.just(responseEntity));
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ return responseEntity;
}
} \ No newline at end of file
diff --git a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java
index 145d9176..83a078df 100644
--- a/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java
+++ b/prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java
@@ -42,4 +42,4 @@ public class CommonFunctions {
return gsonBuilder.create().toJson(ImmutableConsumerDmaapModel.builder().ipv4(consumerDmaapModel.getIpv4())
.ipv6(consumerDmaapModel.getIpv6()).sourceName(consumerDmaapModel.getSourceName()).build());
}
-}
+} \ No newline at end of file
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
index 8ce81757..4327dfbf 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.prh.service;
-import java.util.HashMap;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserName;
private String dmaaPUserPassword;
- private String dmaaPContentType;
/**
* Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
@@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient {
public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
- this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
-
return this;
}
@@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient {
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
.filter(logRequest())
.filter(logResponse())
.build();
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
index ac13dd61..f9a66378 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -29,9 +29,8 @@ import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -41,13 +40,13 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final String dmaapHostName;
private final String dmaapProtocol;
private final Integer dmaapPortNumber;
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
private WebClient webClient;
/**
@@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient {
.uri(getUri())
.header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
.header(X_INVOCATION_ID, UUID.randomUUID().toString())
+ .header(HttpHeaders.CONTENT_TYPE, contentType)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
return Mono.error(e);
}
}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index d049d380..5c72b38c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service.producer;
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
@@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -44,11 +47,13 @@ import reactor.core.publisher.Mono;
public class DMaaPProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
private final String dmaapTopicName;
- private WebClient webClient;
+ private final String dmaapContentType;
+ private RestTemplate restTemplate;
/**
* Constructor DMaaPProducerReactiveHttpClient.
@@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient {
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
/**
@@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
- .header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
+
+ public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ return Mono.defer(() -> {
+ try {
+ HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+ } catch (URISyntaxException e) {
+ logger.warn("Exception while evaluating URI");
+ return Mono.error(e);
+ }
+ });
+ }
+
+ private HttpHeaders getAllHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ return headers;
+
}
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
+ public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
return this;
}
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
index 1a237562..9f693701 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
@@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"));
+ }
+
private void mockDependantObjects() {
when(webClient.get()).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
index e8af8cd9..05b74895 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
@@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-import reactor.core.publisher.Mono;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
+
class DMaaPProducerReactiveHttpClientTest {
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest {
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
DmaapPublisherConfiguration.class);
private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
- private WebClient webClient = mock(WebClient.class);
- private RequestBodyUriSpec requestBodyUriSpec;
- private ResponseSpec responseSpec;
@BeforeEach
@@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
-
+ when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY");
dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
- webClient = spy(WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
- .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
- dmaapPublisherConfigurationMock.dmaapUserPassword()))
- .build());
- requestBodyUriSpec = mock(RequestBodyUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
}
@Test
void getHttpResponse_Success() {
//given
- Integer responseSuccess = 200;
- Mono<Integer> expectedResult = Mono.just(responseSuccess);
-
+ int responseSuccess = 200;
+ ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class);
+ RestTemplate restTemplate = mock(RestTemplate.class);
//when
- mockWebClientDependantObject();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess));
+ doReturn(mockedResponseEntity).when(restTemplate)
+ .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any());
+ dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate);
//then
- Assertions.assertEquals(response.block(), expectedResult.block());
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
+ .expectSubscription().expectNext(mockedResponseEntity).verifyComplete();
}
@Test
@@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest {
//given
dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient);
//when
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
//then
@@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest {
.expectError(Exception.class).verify();
}
- private void mockWebClientDependantObject() {
- RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
- doReturn(responseSpec).when(requestHeadersSpec).retrieve();
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ @Test
+ void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+ Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY"));
}
} \ No newline at end of file