summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java38
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java14
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java24
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
17 files changed, 143 insertions, 151 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index fc485e15..96d47e34 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -21,15 +21,13 @@
package org.onap.dcaegen2.services.prh;
import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -44,17 +42,7 @@ public class MainApp {
}
@Bean
- ConcurrentTaskScheduler concurrentTaskScheduler() {
+ TaskScheduler concurrentTaskScheduler() {
return new ConcurrentTaskScheduler();
}
-
- @Bean
- ThreadPoolTaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler threadPoolTaskScheduler
- = new ThreadPoolTaskScheduler();
- threadPoolTaskScheduler.setPoolSize(5);
- threadPoolTaskScheduler.setThreadNamePrefix(
- "CloudThreadPoolTaskScheduler");
- return threadPoolTaskScheduler;
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index bc4bbf80..11c75e80 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers;
@Primary
public class CloudConfiguration extends AppConfig {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class);
private PrhConfigurationProvider prhConfigurationProvider;
private AaiClientConfiguration aaiClientCloudConfiguration;
@@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig {
}
private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ LOGGER.warn("Error in case of processing system environment, more details below: ", throwable);
}
private void cloudConfigError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+ LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
+ LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul");
prhConfigurationProvider.callForPrhConfiguration(envProperties)
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
private void parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
+ LOGGER.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index d3b6cbb3..fdf6847b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -20,14 +20,15 @@
package org.onap.dcaegen2.services.prh.configuration;
-import java.util.Optional;
-import java.util.Properties;
import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.Properties;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -35,23 +36,23 @@ import reactor.core.publisher.Flux;
class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
- private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class);
private EnvironmentProcessor() {
}
- static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
- logger.info("Loading configuration from system environment variables {}", systemEnvironment);
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ LOGGER.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
.consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
.appName(getService(systemEnvironment)).build();
} catch (EnvironmentLoaderException e) {
- return Flux.error(e);
+ return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
- return Flux.just(envProperties);
+ LOGGER.info("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
}
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
@@ -78,8 +79,8 @@ class EnvironmentProcessor {
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
- logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ LOGGER.warn("$CONSUL_PORT environment has not been defined");
+ LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 2fb61c06..92574417 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config {
private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
AaiClientConfiguration aaiClientConfiguration;
@@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config {
DmaapPublisherConfiguration.class);
}
} catch (IOException e) {
- logger.warn("Problem with file loading, file: {}", filepath, e);
+ LOGGER.warn("Problem with file loading, file: {}", filepath, e);
} catch (JsonSyntaxException e) {
- logger.warn("Problem with Json deserialization", e);
+ LOGGER.warn("Problem with Json deserialization", e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 6132a674..214d6db6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -21,28 +21,26 @@
package org.onap.dcaegen2.services.prh.configuration;
import io.swagger.annotations.ApiOperation;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import javax.annotation.PostConstruct;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Mono;
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
*/
@@ -50,24 +48,22 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker ENTRY = MarkerFactory.getMarker("ENRTY");
- private final ConcurrentTaskScheduler taskScheduler;
+ private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final TaskScheduler cloudTaskScheduler;
private final CloudConfiguration cloudConfiguration;
@Autowired
- public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler,
- ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler,
- CloudConfiguration cloudConfiguration) {
- this.taskScheduler = concurrentTaskScheduler;
+ public SchedulerConfig(TaskScheduler taskScheduler,
+ ScheduledTasks scheduledTask,
+ CloudConfiguration cloudConfiguration) {
+ this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
- this.cloudTaskScheduler = cloudTaskScheduler;
this.cloudConfiguration = cloudConfiguration;
}
@@ -94,9 +90,9 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- logger.info(ENTRY,"Start scheduling PRH workflow");
+ LOGGER.info(ENTRY, "Start scheduling PRH workflow");
if (scheduledPrhTaskFutureList.isEmpty()) {
- scheduledPrhTaskFutureList.add(cloudTaskScheduler
+ scheduledPrhTaskFutureList.add(taskScheduler
.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
index 573724d8..1b2f4a11 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "HeartbeatController", description = "Check liveness of PRH service")
public class HeartbeatController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
/**
* Endpoint for checking that PRH is alive.
@@ -57,7 +57,7 @@ public class HeartbeatController {
}
)
public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
+ LOGGER.trace("Receiving heartbeat request");
return Mono.defer(() ->
Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index 270fa584..9386b9e8 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
@Api(value = "ScheduleController", description = "Schedule Controller")
public class ScheduleController {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class);
private final SchedulerConfig schedulerConfig;
@@ -52,14 +52,14 @@ public class ScheduleController {
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Receiving start scheduling worker request");
+ LOGGER.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
@RequestMapping(value = "stopPrh", method = RequestMethod.GET)
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
+ LOGGER.trace("Receiving stop scheduling worker request");
return schedulerConfig.getResponseFromCancellationOfTasks();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 8742d872..a5ecc1dd 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser {
private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
private static final String SOURCE_NAME = "sourceName";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
@@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser {
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromCallable(() -> new JsonParser().parse(message));
}
private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
index 56ab484b..4f66e25c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
@@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
-public class HttpGetClient {
+class HttpGetClient {
- private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class);
private final WebClient webClient;
private final Gson gson;
@@ -41,12 +41,12 @@ public class HttpGetClient {
this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
}
- HttpGetClient(WebClient webClient){
+ HttpGetClient(WebClient webClient) {
this.webClient = webClient;
this.gson = new Gson();
}
- public <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
+ <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
return webClient
.get()
.uri(url)
@@ -54,7 +54,7 @@ public class HttpGetClient {
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
.bodyToMono(String.class)
- .flatMap(body->getJsonFromRequest(body,tClass));
+ .flatMap(body -> getJsonFromRequest(body, tClass));
}
private RuntimeException getException(ClientResponse response) {
@@ -66,27 +66,26 @@ public class HttpGetClient {
try {
return Mono.just(parseJson(body, tClass));
} catch (JsonSyntaxException | IllegalStateException e) {
- logger.warn("Converting string to json threw error ", e);
return Mono.error(e);
}
}
- private <T> T parseJson(String body, Class<T> tClass){
+ private <T> T parseJson(String body, Class<T> tClass) {
return gson.fromJson(body, tClass);
}
private static ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
- logger.info("Response status {}", clientResponse.statusCode());
+ LOGGER.info("Response status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
private static ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
- logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
index 7af4a7c8..b346bf5e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
@@ -38,7 +38,7 @@ import java.net.URISyntaxException;
@Service
public class PrhConfigurationProvider {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class);
private final HttpGetClient httpGetClient;
@@ -56,12 +56,12 @@ public class PrhConfigurationProvider {
}
private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
- logger.info("Retrieving Config Binding Service endpoint from Consul");
+ LOGGER.info("Retrieving Config Binding Service endpoint from Consul");
try {
return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
.flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName()));
} catch (URISyntaxException e) {
- logger.warn("Malformed Consul uri", e);
+ LOGGER.warn("Malformed Consul uri", e);
return Mono.error(e);
}
}
@@ -72,7 +72,7 @@ public class PrhConfigurationProvider {
}
private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) {
- logger.info("Retrieving PRH configuration");
+ LOGGER.info("Retrieving PRH configuration");
return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
}
@@ -86,7 +86,7 @@ public class PrhConfigurationProvider {
return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
} catch (URISyntaxException e) {
- logger.warn("Malformed Config Binding Service uri", e);
+ LOGGER.warn("Malformed Config Binding Service uri", e);
return Mono.error(e);
}
}
@@ -99,7 +99,7 @@ public class PrhConfigurationProvider {
throw new IllegalStateException("JSON Array was empty");
}
} catch (IllegalStateException e) {
- logger.warn("Failed to retrieve JSON Object from array", e);
+ LOGGER.warn("Failed to retrieve JSON Object from array", e);
return Mono.error(e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index f58fed61..5a05d374 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
public abstract class AaiProducerTask {
- abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException;
+ abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
- abstract AaiProducerReactiveHttpClient resolveClient();
+ abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException;
protected abstract AaiClientConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel)
- throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException;
- WebClient buildWebClient() {
+ WebClient buildWebClient() throws SSLException {
return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index f5b8307b..7ccf75a6 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@@ -43,9 +45,8 @@ import reactor.core.publisher.Mono;
public class AaiProducerTaskImpl extends
AaiProducerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
-
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
private final Config config;
private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends
}
@Override
- Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
-
+ Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
+ LOGGER.info("Publish to AAI DmaapModel");
return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
.flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response)) {
- return consumerDmaapModel;
+ if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
+ return Mono.just(consumerDmaapModel);
}
return Mono
.error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
@@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends
}
@Override
- AaiProducerReactiveHttpClient resolveClient() {
- return new AaiProducerReactiveHttpClient(resolveConfiguration());
+ AaiProducerReactiveHttpClient resolveClient() throws SSLException {
+ return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient());
}
@Override
@@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends
}
@Override
- protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException {
+ protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException, SSLException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
aaiProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index a912ca9e..d322a43e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -21,7 +21,6 @@
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
@@ -33,7 +32,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
abstract DMaaPConsumerReactiveHttpClient resolveClient();
@@ -41,7 +40,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 9e1fadf1..0d4be08e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,17 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Map;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,11 +40,11 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(Config config) {
@@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
public Mono<ConsumerDmaapModel> execute(String object) {
- dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", object);
+ DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ LOGGER.debug(INVOKE, "Method called with arg {}", object);
return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 9a5813d1..7a121d5f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -33,15 +33,14 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
abstract DMaaPProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws PrhTaskException;
- WebClient buildWebClient() {
- return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
+ abstract RestTemplate buildWebClient();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 73260381..733b8651 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -39,8 +41,8 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private final Config config;
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- return consumerDmaapModel.flatMap(dmaapModel -> {
- logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
- dmaapModel);
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
- });
+ Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
+ return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
dmaapProducerReactiveHttpClient = resolveClient();
- logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+ LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
@Override
+ RestTemplate buildWebClient() {
+ return new RestTemplate();
+ }
+
+ @Override
protected DmaapPublisherConfiguration resolveConfiguration() {
return config.getDmaapPublisherConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6432a338..f74bc56a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
private final AaiProducerTask aaiProducerTask;
@@ -72,24 +72,33 @@ public class ScheduledTasks {
*/
public void scheduleMainPrhEventTask() {
MDCVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
-
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .map(this::publishToAaiConfiguration)
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
+ try {
+ logger.trace("Execution of tasks was registered");
+ CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+ consumeFromDMaaPMessage()
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ logger.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::publishToDmaapConfiguration)
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ mainCountDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
private void onComplete() {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(String responseCode) {
- MDC.put(RESPONSE_CODE, responseCode);
- logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(ResponseEntity<String> responseCode) {
+ MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ responseCode.getStatusCode().value());
}
private void onError(Throwable throwable) {
@@ -98,24 +107,26 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
- return () -> {
+
+ private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Mono.defer(() -> {
MDCVariables.setMdcContextMap(contextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
+ logger.info("Init configs");
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- };
+ });
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
try {
return aaiProducerTask.execute(monoDMaaPModel);
- } catch (PrhTaskException e) {
+ } catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
+ private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
} catch (PrhTaskException e) {