diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
7 files changed, 79 insertions, 66 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index f58fed61..5a05d374 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ public abstract class AaiProducerTask { - abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; + abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiProducerReactiveHttpClient resolveClient(); + abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException; protected abstract AaiClientConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) - throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException; - WebClient buildWebClient() { + WebClient buildWebClient() throws SSLException { return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index f5b8307b..7ccf75a6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -43,9 +45,8 @@ import reactor.core.publisher.Mono; public class AaiProducerTaskImpl extends AaiProducerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; @@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends } @Override - Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - + Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { + LOGGER.info("Publish to AAI DmaapModel"); return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response)) { - return consumerDmaapModel; + if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { + return Mono.just(consumerDmaapModel); } return Mono .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); @@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends } @Override - AaiProducerReactiveHttpClient resolveClient() { - return new AaiProducerReactiveHttpClient(resolveConfiguration()); + AaiProducerReactiveHttpClient resolveClient() throws SSLException { + return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient()); } @Override @@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends } @Override - protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } aaiProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a912ca9e..d322a43e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -21,7 +21,6 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; @@ -33,7 +32,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException; + abstract Mono<ConsumerDmaapModel> consume(Mono<String> message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -41,7 +40,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; + protected abstract Mono<ConsumerDmaapModel> execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 9e1fadf1..0d4be08e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,17 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Map; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,11 +40,11 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(Config config) { @@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override public Mono<ConsumerDmaapModel> execute(String object) { - dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", object); + DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + LOGGER.debug(INVOKE, "Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 9a5813d1..7a121d5f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -33,15 +33,14 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; abstract DMaaPProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) + throws PrhTaskException; - WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } + abstract RestTemplate buildWebClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 73260381..733b8651 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -39,8 +41,8 @@ import reactor.core.publisher.Mono; @Component public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final Config config; private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @@ -50,25 +52,26 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - return consumerDmaapModel.flatMap(dmaapModel -> { - logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), - dmaapModel); - return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel); - }); + Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) { + return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } dmaapProducerReactiveHttpClient = resolveClient(); - logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel); + LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } @Override + RestTemplate buildWebClient() { + return new RestTemplate(); + } + + @Override protected DmaapPublisherConfiguration resolveConfiguration() { return config.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6432a338..f74bc56a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; @@ -72,24 +72,33 @@ public class ScheduledTasks { */ public void scheduleMainPrhEventTask() { MDCVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); + try { + logger.trace("Execution of tasks was registered"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromDMaaPMessage() + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + mainCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private void onComplete() { logger.info("PRH tasks have been completed"); } - private void onSuccess(String responseCode) { - MDC.put(RESPONSE_CODE, responseCode); - logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(ResponseEntity<String> responseCode) { + MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + responseCode.getStatusCode().value()); } private void onError(Throwable throwable) { @@ -98,24 +107,26 @@ public class ScheduledTasks { } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { - return () -> { + + private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Mono.defer(() -> { MDCVariables.setMdcContextMap(contextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); + logger.info("Init configs"); dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }; + }); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { try { return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); } catch (PrhTaskException e) { |