diff options
Diffstat (limited to 'prh-app-server/src/main/java/org')
17 files changed, 143 insertions, 151 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index fc485e15..96d47e34 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -21,15 +21,13 @@ package org.onap.dcaegen2.services.prh; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -44,17 +42,7 @@ public class MainApp { } @Bean - ConcurrentTaskScheduler concurrentTaskScheduler() { + TaskScheduler concurrentTaskScheduler() { return new ConcurrentTaskScheduler(); } - - @Bean - ThreadPoolTaskScheduler threadPoolTaskScheduler() { - ThreadPoolTaskScheduler threadPoolTaskScheduler - = new ThreadPoolTaskScheduler(); - threadPoolTaskScheduler.setPoolSize(5); - threadPoolTaskScheduler.setThreadNamePrefix( - "CloudThreadPoolTaskScheduler"); - return threadPoolTaskScheduler; - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index bc4bbf80..11c75e80 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers; @Primary public class CloudConfiguration extends AppConfig { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class); private PrhConfigurationProvider prhConfigurationProvider; private AaiClientConfiguration aaiClientCloudConfiguration; @@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig { } private void parsingConfigError(Throwable throwable) { - logger.warn("Error in case of processing system environment, more details below: ", throwable); + LOGGER.warn("Error in case of processing system environment, more details below: ", throwable); } private void cloudConfigError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); } private void parsingConfigSuccess(EnvProperties envProperties) { - logger.info("Fetching PRH configuration from ConfigBindingService/Consul"); + LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul"); prhConfigurationProvider.callForPrhConfiguration(envProperties) .subscribe(this::parseCloudConfig, this::cloudConfigError); } private void parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); + LOGGER.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java index d3b6cbb3..fdf6847b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java @@ -20,14 +20,15 @@ package org.onap.dcaegen2.services.prh.configuration; -import java.util.Optional; -import java.util.Properties; import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.prh.model.EnvProperties; import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.Properties; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 @@ -35,23 +36,23 @@ import reactor.core.publisher.Flux; class EnvironmentProcessor { private static final int DEFAULT_CONSUL_PORT = 8500; - private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class); private EnvironmentProcessor() { } - static Flux<EnvProperties> evaluate(Properties systemEnvironment) { - logger.info("Loading configuration from system environment variables {}", systemEnvironment); + static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + LOGGER.info("Loading configuration from system environment variables"); EnvProperties envProperties; try { envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) .appName(getService(systemEnvironment)).build(); } catch (EnvironmentLoaderException e) { - return Flux.error(e); + return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); - return Flux.just(envProperties); + LOGGER.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); } private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { @@ -78,8 +79,8 @@ class EnvironmentProcessor { } private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT environment has not been defined"); - logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + LOGGER.warn("$CONSUL_PORT environment has not been defined"); + LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); return DEFAULT_CONSUL_PORT; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 2fb61c06..92574417 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config { private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); AaiClientConfiguration aaiClientConfiguration; @@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config { DmaapPublisherConfiguration.class); } } catch (IOException e) { - logger.warn("Problem with file loading, file: {}", filepath, e); + LOGGER.warn("Problem with file loading, file: {}", filepath, e); } catch (JsonSyntaxException e) { - logger.warn("Problem with Json deserialization", e); + LOGGER.warn("Problem with Json deserialization", e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java index 6132a674..214d6db6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java @@ -21,28 +21,26 @@ package org.onap.dcaegen2.services.prh.configuration; import io.swagger.annotations.ApiOperation; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import javax.annotation.PostConstruct; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import reactor.core.publisher.Mono; +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 */ @@ -50,24 +48,22 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig { - private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5; + private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker ENTRY = MarkerFactory.getMarker("ENRTY"); - private final ConcurrentTaskScheduler taskScheduler; + private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final TaskScheduler cloudTaskScheduler; private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler, - ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler, - CloudConfiguration cloudConfiguration) { - this.taskScheduler = concurrentTaskScheduler; + public SchedulerConfig(TaskScheduler taskScheduler, + ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { + this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.cloudTaskScheduler = cloudTaskScheduler; this.cloudConfiguration = cloudConfiguration; } @@ -94,9 +90,9 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { - logger.info(ENTRY,"Start scheduling PRH workflow"); + LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { - scheduledPrhTaskFutureList.add(cloudTaskScheduler + scheduledPrhTaskFutureList.add(taskScheduler .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledPrhTaskFutureList.add(taskScheduler diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java index 573724d8..1b2f4a11 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "HeartbeatController", description = "Check liveness of PRH service") public class HeartbeatController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class); /** * Endpoint for checking that PRH is alive. @@ -57,7 +57,7 @@ public class HeartbeatController { } ) public Mono<ResponseEntity<String>> heartbeat() { - logger.trace("Receiving heartbeat request"); + LOGGER.trace("Receiving heartbeat request"); return Mono.defer(() -> Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) ); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index 270fa584..9386b9e8 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class); private final SchedulerConfig schedulerConfig; @@ -52,14 +52,14 @@ public class ScheduleController { @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") public Mono<ResponseEntity<String>> startTasks() { - logger.trace("Receiving start scheduling worker request"); + LOGGER.trace("Receiving start scheduling worker request"); return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); } @RequestMapping(value = "stopPrh", method = RequestMethod.GET) @ApiOperation(value = "Receiving stop scheduling worker request") public Mono<ResponseEntity<String>> stopTask() { - logger.trace("Receiving stop scheduling worker request"); + LOGGER.trace("Receiving stop scheduling worker request"); return schedulerConfig.getResponseFromCancellationOfTasks(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 8742d872..a5ecc1dd 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ @@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser { private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; private static final String SOURCE_NAME = "sourceName"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. * @@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser { */ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message)) .flatMap(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromCallable(() -> new JsonParser().parse(message)); } private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) : getConsumerDmaapModelFromJsonArray(jsonElement); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java index 56ab484b..4f66e25c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java @@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; -public class HttpGetClient { +class HttpGetClient { - private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class); private final WebClient webClient; private final Gson gson; @@ -41,12 +41,12 @@ public class HttpGetClient { this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); } - HttpGetClient(WebClient webClient){ + HttpGetClient(WebClient webClient) { this.webClient = webClient; this.gson = new Gson(); } - public <T> Mono<T> callHttpGet(String url, Class<T> tClass) { + <T> Mono<T> callHttpGet(String url, Class<T> tClass) { return webClient .get() .uri(url) @@ -54,7 +54,7 @@ public class HttpGetClient { .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) .bodyToMono(String.class) - .flatMap(body->getJsonFromRequest(body,tClass)); + .flatMap(body -> getJsonFromRequest(body, tClass)); } private RuntimeException getException(ClientResponse response) { @@ -66,27 +66,26 @@ public class HttpGetClient { try { return Mono.just(parseJson(body, tClass)); } catch (JsonSyntaxException | IllegalStateException e) { - logger.warn("Converting string to json threw error ", e); return Mono.error(e); } } - private <T> T parseJson(String body, Class<T> tClass){ + private <T> T parseJson(String body, Class<T> tClass) { return gson.fromJson(body, tClass); } private static ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { - logger.info("Response status {}", clientResponse.statusCode()); + LOGGER.info("Response status {}", clientResponse.statusCode()); return Mono.just(clientResponse); }); } private static ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value))); return Mono.just(clientRequest); }); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java index 7af4a7c8..b346bf5e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java @@ -38,7 +38,7 @@ import java.net.URISyntaxException; @Service public class PrhConfigurationProvider { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class); private final HttpGetClient httpGetClient; @@ -56,12 +56,12 @@ public class PrhConfigurationProvider { } private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { - logger.info("Retrieving Config Binding Service endpoint from Consul"); + LOGGER.info("Retrieving Config Binding Service endpoint from Consul"); try { return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) .flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName())); } catch (URISyntaxException e) { - logger.warn("Malformed Consul uri", e); + LOGGER.warn("Malformed Consul uri", e); return Mono.error(e); } } @@ -72,7 +72,7 @@ public class PrhConfigurationProvider { } private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) { - logger.info("Retrieving PRH configuration"); + LOGGER.info("Retrieving PRH configuration"); return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); } @@ -86,7 +86,7 @@ public class PrhConfigurationProvider { return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); } catch (URISyntaxException e) { - logger.warn("Malformed Config Binding Service uri", e); + LOGGER.warn("Malformed Config Binding Service uri", e); return Mono.error(e); } } @@ -99,7 +99,7 @@ public class PrhConfigurationProvider { throw new IllegalStateException("JSON Array was empty"); } } catch (IllegalStateException e) { - logger.warn("Failed to retrieve JSON Object from array", e); + LOGGER.warn("Failed to retrieve JSON Object from array", e); return Mono.error(e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index f58fed61..5a05d374 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ public abstract class AaiProducerTask { - abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; + abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiProducerReactiveHttpClient resolveClient(); + abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException; protected abstract AaiClientConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) - throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException; - WebClient buildWebClient() { + WebClient buildWebClient() throws SSLException { return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index f5b8307b..7ccf75a6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -43,9 +45,8 @@ import reactor.core.publisher.Mono; public class AaiProducerTaskImpl extends AaiProducerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; @@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends } @Override - Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - + Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { + LOGGER.info("Publish to AAI DmaapModel"); return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response)) { - return consumerDmaapModel; + if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { + return Mono.just(consumerDmaapModel); } return Mono .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); @@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends } @Override - AaiProducerReactiveHttpClient resolveClient() { - return new AaiProducerReactiveHttpClient(resolveConfiguration()); + AaiProducerReactiveHttpClient resolveClient() throws SSLException { + return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient()); } @Override @@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends } @Override - protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } aaiProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a912ca9e..d322a43e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -21,7 +21,6 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; @@ -33,7 +32,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException; + abstract Mono<ConsumerDmaapModel> consume(Mono<String> message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -41,7 +40,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 9e1fadf1..0d4be08e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,17 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,11 +40,11 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(Config config) { @@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override public Mono<ConsumerDmaapModel> execute(String object) { - dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", object); + DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + LOGGER.debug(INVOKE, "Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 9a5813d1..7a121d5f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -33,15 +33,14 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; abstract DMaaPProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException; - WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } + abstract RestTemplate buildWebClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 73260381..733b8651 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -39,8 +41,8 @@ import reactor.core.publisher.Mono; @Component public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final Config config; private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - return consumerDmaapModel.flatMap(dmaapModel -> { - logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), - dmaapModel); - return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel); - }); + Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) { + return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } dmaapProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } @Override + RestTemplate buildWebClient() { + return new RestTemplate(); + } + + @Override protected DmaapPublisherConfiguration resolveConfiguration() { return config.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6432a338..f74bc56a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; @@ -72,24 +72,33 @@ public class ScheduledTasks { */ public void scheduleMainPrhEventTask() { MDCVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); + try { + logger.trace("Execution of tasks was registered"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromDMaaPMessage() + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + mainCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private void onComplete() { logger.info("PRH tasks have been completed"); } - private void onSuccess(String responseCode) { - MDC.put(RESPONSE_CODE, responseCode); - logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(ResponseEntity<String> responseCode) { + MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + responseCode.getStatusCode().value()); } private void onError(Throwable throwable) { @@ -98,24 +107,26 @@ public class ScheduledTasks { } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { - return () -> { + + private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Mono.defer(() -> { MDCVariables.setMdcContextMap(contextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); + logger.info("Init configs"); dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }; + }); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { try { return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); } catch (PrhTaskException e) { |