aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-25 12:29:39 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-06 14:22:18 +0200
commit58cbfef5661242a2523b7a183a664498fd1f405a (patch)
tree68fd4d654d43124e8450526fa4a54c6e4a345322 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
parent2cfcc6756e59ed8cda571efa8b29764eab7837c8 (diff)
Cleaned in code in reactive tasks
*Formated code *Added handling exceptions *Deleted unused code Change-Id: I3e95bcb8ba7cdf85f6a1daaec7cadc86080e0b10 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java64
1 files changed, 49 insertions, 15 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index addeaae2..cf096b7b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -19,12 +19,16 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.concurrent.Callable;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -49,22 +53,52 @@ public class ScheduledTasks {
public void scheduleMainPrhEventTask() {
logger.trace("Execution of tasks was registered");
- Mono.fromSupplier(() -> Mono.fromCallable(() ->
+ Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+ .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
+ .flatMap(this::publishToAAIConfiguration)
+ .flatMap(this::publishToDMaaPConfiguration)
+ .subscribeOn(Schedulers.elastic());
+
+ dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ private void onComplete() {
+ logger.info("PRH tasks have been completed");
+ }
+
+ private void onSuccess(Integer responseCode) {
+ logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ }
+
+ private void onError(Throwable throwable) {
+ if (!(throwable instanceof DmaapEmptyResponseException)) {
+ logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+ }
+ }
+
+ private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return () ->
{
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- }).subscribe(consumerDmaapModel -> Mono
- .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
- .subscribe(
- aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
- .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
- error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
- () -> logger.info("Completed DmaapPublisher task"))),
- errorResponse -> logger
- .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
- , () -> logger.info("Completed AAIProducer task")))
- .subscribe(Disposable::dispose, tasksError -> logger
- .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
- , () -> logger.info("PRH tasks was consumed properly")).dispose();
+ };
+ }
+
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
}
}