diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-06-25 12:29:39 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-06 14:22:18 +0200 |
commit | 58cbfef5661242a2523b7a183a664498fd1f405a (patch) | |
tree | 68fd4d654d43124e8450526fa4a54c6e4a345322 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | |
parent | 2cfcc6756e59ed8cda571efa8b29764eab7837c8 (diff) |
Cleaned in code in reactive tasks
*Formated code
*Added handling exceptions
*Deleted unused code
Change-Id: I3e95bcb8ba7cdf85f6a1daaec7cadc86080e0b10
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 64 |
1 files changed, 49 insertions, 15 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index addeaae2..cf096b7b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,12 +19,16 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.util.concurrent.Callable; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -49,22 +53,52 @@ public class ScheduledTasks { public void scheduleMainPrhEventTask() { logger.trace("Execution of tasks was registered"); - Mono.fromSupplier(() -> Mono.fromCallable(() -> + Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) + .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) + .flatMap(this::publishToAAIConfiguration) + .flatMap(this::publishToDMaaPConfiguration) + .subscribeOn(Schedulers.elastic()); + + dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + } + + private void onComplete() { + logger.info("PRH tasks have been completed"); + } + + private void onSuccess(Integer responseCode) { + logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + } + + private void onError(Throwable throwable) { + if (!(throwable instanceof DmaapEmptyResponseException)) { + logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + } + } + + private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return () -> { dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }).subscribe(consumerDmaapModel -> Mono - .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel)) - .subscribe( - aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel)) - .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp), - error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error), - () -> logger.info("Completed DmaapPublisher task"))), - errorResponse -> logger - .warn("Error has been thrown in AAIProducerTask: {}", errorResponse) - , () -> logger.info("Completed AAIProducer task"))) - .subscribe(Disposable::dispose, tasksError -> logger - .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError) - , () -> logger.info("PRH tasks was consumed properly")).dispose(); + }; + } + + private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) { + try { + return Mono.just(aaiProducerTask.execute(dmaapModel)); + } catch (PrhTaskException e) { + logger.warn("Exception in A&AIProducer task ", e); + return Mono.error(e); + } + } + + private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) { + try { + return Mono.just(dmaapProducerTask.execute(aaiModel)); + } catch (PrhTaskException e) { + logger.warn("Exception in DMaaPProducer task ", e); + return Mono.error(e); + } } } |