diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-06-22 20:11:20 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-06 08:51:56 +0200 |
commit | 2cfcc6756e59ed8cda571efa8b29764eab7837c8 (patch) | |
tree | e99375044d7ca9a9cac0962e459cafd5f580b094 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | |
parent | a684d478f8b81bba83123d4f1fd1ec3c29df73ca (diff) |
Added reactive tasks flow control
Change-Id: I9cb2bede66e9e446912f2e6a815c7b56b80813b9
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index f7767101..addeaae2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,11 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,19 +47,24 @@ public class ScheduledTasks { } public void scheduleMainPrhEventTask() { - logger.trace("Execution of task was registered"); - setTaskExecutionFlow(); - try { - dmaapConsumerTask.initConfigs(); - dmaapConsumerTask.receiveRequest(""); - } catch (PrhTaskException e) { - logger - .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e); - } - } + logger.trace("Execution of tasks was registered"); - private void setTaskExecutionFlow() { - dmaapConsumerTask.setNext(aaiProducerTask); - aaiProducerTask.setNext(dmaapProducerTask); + Mono.fromSupplier(() -> Mono.fromCallable(() -> + { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); + }).subscribe(consumerDmaapModel -> Mono + .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel)) + .subscribe( + aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel)) + .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp), + error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error), + () -> logger.info("Completed DmaapPublisher task"))), + errorResponse -> logger + .warn("Error has been thrown in AAIProducerTask: {}", errorResponse) + , () -> logger.info("Completed AAIProducer task"))) + .subscribe(Disposable::dispose, tasksError -> logger + .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError) + , () -> logger.info("PRH tasks was consumed properly")).dispose(); } } |