aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
committerpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
commit83df6e1df5ec20627c85af9ba2f49036dd58f328 (patch)
treede9282995bc4c7b0d0f277760b1d6f3574970794 /prh-app-server/src/main/java/org/onap/dcaegen2/services
parent3c2766d8a64d21f402b5234e33419a8aed14d7ea (diff)
Refatoring due to prh workflow
1. Added specified HttpClient for DmaaPPublisher: *DmaaP Handle transfer-encoding: chunk header and reject the request if it will be set by the client. In conclusion no other reactive http client can be used for pushing something to dmaap. 2. Added sll support to A&AI rective webclient. *Behaviour of reactive A&AI HttpClient is different as in native spring have without it. 3. Added 10s fixed time in PRH for requesting DmaaP. 4. Added debug log in reactive/native http clients. 5. Fixed reactive workflow of prh. 6. Updated the version of: * spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE * spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE * spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE * reactor-bom:Bismuth-RELEASE->Bismuth-SR10 Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8 Issue-ID: DCAEGEN2-743 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java38
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java14
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java24
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
17 files changed, 143 insertions, 151 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index fc485e15..96d47e34 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -21,15 +21,13 @@
package org.onap.dcaegen2.services.prh;
import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -44,17 +42,7 @@ public class MainApp {
}
@Bean
- ConcurrentTaskScheduler concurrentTaskScheduler() {
+ TaskScheduler concurrentTaskScheduler() {
return new ConcurrentTaskScheduler();
}
-
- @Bean
- ThreadPoolTaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler threadPoolTaskScheduler
- = new ThreadPoolTaskScheduler();
- threadPoolTaskScheduler.setPoolSize(5);
- threadPoolTaskScheduler.setThreadNamePrefix(
- "CloudThreadPoolTaskScheduler");
- return threadPoolTaskScheduler;
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index bc4bbf80..11c75e80 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers;
@Primary
public class CloudConfiguration extends AppConfig {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class);
private PrhConfigurationProvider prhConfigurationProvider;
private AaiClientConfiguration aaiClientCloudConfiguration;
@@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig {
}
private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ LOGGER.warn("Error in case of processing system environment, more details below: ", throwable);
}
private void cloudConfigError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+ LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
+ LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul");
prhConfigurationProvider.callForPrhConfiguration(envProperties)
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
private void parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
+ LOGGER.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index d3b6cbb3..fdf6847b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -20,14 +20,15 @@
package org.onap.dcaegen2.services.prh.configuration;
-import java.util.Optional;
-import java.util.Properties;
import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.Properties;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -35,23 +36,23 @@ import reactor.core.publisher.Flux;
class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
- private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class);
private EnvironmentProcessor() {
}
- static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
- logger.info("Loading configuration from system environment variables {}", systemEnvironment);
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ LOGGER.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
.consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
.appName(getService(systemEnvironment)).build();
} catch (EnvironmentLoaderException e) {
- return Flux.error(e);
+ return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
- return Flux.just(envProperties);
+ LOGGER.info("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
}
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
@@ -78,8 +79,8 @@ class EnvironmentProcessor {
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
- logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ LOGGER.warn("$CONSUL_PORT environment has not been defined");
+ LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 2fb61c06..92574417 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config {
private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
AaiClientConfiguration aaiClientConfiguration;
@@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config {
DmaapPublisherConfiguration.class);
}
} catch (IOException e) {
- logger.warn("Problem with file loading, file: {}", filepath, e);
+ LOGGER.warn("Problem with file loading, file: {}", filepath, e);
} catch (JsonSyntaxException e) {
- logger.warn("Problem with Json deserialization", e);
+ LOGGER.warn("Problem with Json deserialization", e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 6132a674..214d6db6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -21,28 +21,26 @@
package org.onap.dcaegen2.services.prh.configuration;
import io.swagger.annotations.ApiOperation;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import javax.annotation.PostConstruct;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Mono;
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
*/
@@ -50,24 +48,22 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker ENTRY = MarkerFactory.getMarker("ENRTY");
- private final ConcurrentTaskScheduler taskScheduler;
+ private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final TaskScheduler cloudTaskScheduler;
private final CloudConfiguration cloudConfiguration;
@Autowired
- public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler,
- ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler,
- CloudConfiguration cloudConfiguration) {
- this.taskScheduler = concurrentTaskScheduler;
+ public SchedulerConfig(TaskScheduler taskScheduler,
+ ScheduledTasks scheduledTask,
+ CloudConfiguration cloudConfiguration) {
+ this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
- this.cloudTaskScheduler = cloudTaskScheduler;
this.cloudConfiguration = cloudConfiguration;
}
@@ -94,9 +90,9 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- logger.info(ENTRY,"Start scheduling PRH workflow");
+ LOGGER.info(ENTRY, "Start scheduling PRH workflow");
if (scheduledPrhTaskFutureList.isEmpty()) {
- scheduledPrhTaskFutureList.add(cloudTaskScheduler
+ scheduledPrhTaskFutureList.add(taskScheduler
.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
index 573724d8..1b2f4a11 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "HeartbeatController", description = "Check liveness of PRH service")
public class HeartbeatController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
/**
* Endpoint for checking that PRH is alive.
@@ -57,7 +57,7 @@ public class HeartbeatController {
}
)
public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
+ LOGGER.trace("Receiving heartbeat request");
return Mono.defer(() ->
Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index 270fa584..9386b9e8 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "ScheduleController", description = "Schedule Controller")
public class ScheduleController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class);
private final SchedulerConfig schedulerConfig;
@@ -52,14 +52,14 @@ public class ScheduleController {
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Receiving start scheduling worker request");
+ LOGGER.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
@RequestMapping(value = "stopPrh", method = RequestMethod.GET)
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
+ LOGGER.trace("Receiving stop scheduling worker request");
return schedulerConfig.getResponseFromCancellationOfTasks();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 8742d872..a5ecc1dd 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser {
private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
private static final String SOURCE_NAME = "sourceName";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
@@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser {
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromCallable(() -> new JsonParser().parse(message));
}
private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
index 56ab484b..4f66e25c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
@@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
-public class HttpGetClient {
+class HttpGetClient {
- private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class);
private final WebClient webClient;
private final Gson gson;
@@ -41,12 +41,12 @@ public class HttpGetClient {
this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
}
- HttpGetClient(WebClient webClient){
+ HttpGetClient(WebClient webClient) {
this.webClient = webClient;
this.gson = new Gson();
}
- public <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
+ <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
return webClient
.get()
.uri(url)
@@ -54,7 +54,7 @@ public class HttpGetClient {
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
.bodyToMono(String.class)
- .flatMap(body->getJsonFromRequest(body,tClass));
+ .flatMap(body -> getJsonFromRequest(body, tClass));
}
private RuntimeException getException(ClientResponse response) {
@@ -66,27 +66,26 @@ public class HttpGetClient {
try {
return Mono.just(parseJson(body, tClass));
} catch (JsonSyntaxException | IllegalStateException e) {
- logger.warn("Converting string to json threw error ", e);
return Mono.error(e);
}
}
- private <T> T parseJson(String body, Class<T> tClass){
+ private <T> T parseJson(String body, Class<T> tClass) {
return gson.fromJson(body, tClass);
}
private static ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
- logger.info("Response status {}", clientResponse.statusCode());
+ LOGGER.info("Response status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
private static ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
- logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
index 7af4a7c8..b346bf5e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
@@ -38,7 +38,7 @@ import java.net.URISyntaxException;
@Service
public class PrhConfigurationProvider {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class);
private final HttpGetClient httpGetClient;
@@ -56,12 +56,12 @@ public class PrhConfigurationProvider {
}
private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
- logger.info("Retrieving Config Binding Service endpoint from Consul");
+ LOGGER.info("Retrieving Config Binding Service endpoint from Consul");
try {
return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
.flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName()));
} catch (URISyntaxException e) {
- logger.warn("Malformed Consul uri", e);
+ LOGGER.warn("Malformed Consul uri", e);
return Mono.error(e);
}
}
@@ -72,7 +72,7 @@ public class PrhConfigurationProvider {
}
private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) {
- logger.info("Retrieving PRH configuration");
+ LOGGER.info("Retrieving PRH configuration");
return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
}
@@ -86,7 +86,7 @@ public class PrhConfigurationProvider {
return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
} catch (URISyntaxException e) {
- logger.warn("Malformed Config Binding Service uri", e);
+ LOGGER.warn("Malformed Config Binding Service uri", e);
return Mono.error(e);
}
}
@@ -99,7 +99,7 @@ public class PrhConfigurationProvider {
throw new IllegalStateException("JSON Array was empty");
}
} catch (IllegalStateException e) {
- logger.warn("Failed to retrieve JSON Object from array", e);
+ LOGGER.warn("Failed to retrieve JSON Object from array", e);
return Mono.error(e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index f58fed61..5a05d374 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
public abstract class AaiProducerTask {
- abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException;
+ abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
- abstract AaiProducerReactiveHttpClient resolveClient();
+ abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException;
protected abstract AaiClientConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel)
- throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException;
- WebClient buildWebClient() {
+ WebClient buildWebClient() throws SSLException {
return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index f5b8307b..7ccf75a6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@@ -43,9 +45,8 @@ import reactor.core.publisher.Mono;
public class AaiProducerTaskImpl extends
AaiProducerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
-
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
private final Config config;
private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends
}
@Override
- Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
-
+ Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
+ LOGGER.info("Publish to AAI DmaapModel");
return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
.flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response)) {
- return consumerDmaapModel;
+ if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
+ return Mono.just(consumerDmaapModel);
}
return Mono
.error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
@@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends
}
@Override
- AaiProducerReactiveHttpClient resolveClient() {
- return new AaiProducerReactiveHttpClient(resolveConfiguration());
+ AaiProducerReactiveHttpClient resolveClient() throws SSLException {
+ return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient());
}
@Override
@@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends
}
@Override
- protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException {
+ protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
aaiProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index a912ca9e..d322a43e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -21,7 +21,6 @@
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
@@ -33,7 +32,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
abstract DMaaPConsumerReactiveHttpClient resolveClient();
@@ -41,7 +40,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 9e1fadf1..0d4be08e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,17 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,11 +40,11 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(Config config) {
@@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
public Mono<ConsumerDmaapModel> execute(String object) {
- dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", object);
+ DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ LOGGER.debug(INVOKE, "Method called with arg {}", object);
return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 9a5813d1..7a121d5f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -33,15 +33,14 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
abstract DMaaPProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException;
- WebClient buildWebClient() {
- return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
+ abstract RestTemplate buildWebClient();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 73260381..733b8651 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -39,8 +41,8 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private final Config config;
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- return consumerDmaapModel.flatMap(dmaapModel -> {
- logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
- dmaapModel);
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
- });
+ Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
+ return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
dmaapProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
@Override
+ RestTemplate buildWebClient() {
+ return new RestTemplate();
+ }
+
+ @Override
protected DmaapPublisherConfiguration resolveConfiguration() {
return config.getDmaapPublisherConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6432a338..f74bc56a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
private final AaiProducerTask aaiProducerTask;
@@ -72,24 +72,33 @@ public class ScheduledTasks {
*/
public void scheduleMainPrhEventTask() {
MDCVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
-
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .map(this::publishToAaiConfiguration)
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
+ try {
+ logger.trace("Execution of tasks was registered");
+ CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+ consumeFromDMaaPMessage()
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ logger.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::publishToDmaapConfiguration)
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ mainCountDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
private void onComplete() {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(String responseCode) {
- MDC.put(RESPONSE_CODE, responseCode);
- logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(ResponseEntity<String> responseCode) {
+ MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ responseCode.getStatusCode().value());
}
private void onError(Throwable throwable) {
@@ -98,24 +107,26 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
- return () -> {
+
+ private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Mono.defer(() -> {
MDCVariables.setMdcContextMap(contextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
+ logger.info("Init configs");
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- };
+ });
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
try {
return aaiProducerTask.execute(monoDMaaPModel);
- } catch (PrhTaskException e) {
+ } catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
+ private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
} catch (PrhTaskException e) {