diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6ae8a3c7..16a6f8c5 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -54,6 +54,7 @@ public class ScheduledTasks { private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; + private final BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; /** @@ -64,11 +65,16 @@ public class ScheduledTasks { * @param aaiPublisherTask - second task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask, - AaiProducerTask aaiPublisherTask, Map<String, String> mdcContextMap) { + public ScheduledTasks( + DmaapConsumerTask dmaapConsumerTask, + DmaapPublisherTask dmaapPublisherTask, + AaiProducerTask aaiPublisherTask, + BbsActionsTask bbsActionsTask, + Map<String, String> mdcContextMap) { this.dmaapConsumerTask = dmaapConsumerTask; this.dmaapProducerTask = dmaapPublisherTask; this.aaiProducerTask = aaiPublisherTask; + this.bbsActionsTask = bbsActionsTask; this.mdcContextMap = mdcContextMap; } @@ -88,6 +94,9 @@ public class ScheduledTasks { .doOnError(exception -> logger.warn("AAIProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .flatMap(this::processAdditionalFields) + .doOnError(exception -> + logger.warn("BBSActionsTask exception has been registered: ", exception)) .flatMap(this::publishToDmaapConfiguration) .doOnError(exception -> logger.warn("DMaaPProducerTask exception has been registered: ", exception)) @@ -102,7 +111,6 @@ public class ScheduledTasks { } } - private void onComplete() { logger.info("PRH tasks have been completed"); } @@ -121,7 +129,6 @@ public class ScheduledTasks { } } - private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() { return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); @@ -148,6 +155,10 @@ public class ScheduledTasks { } } + private Mono<ConsumerDmaapModel> processAdditionalFields(ConsumerDmaapModel consumerDmaapModel) { + return bbsActionsTask.execute(consumerDmaapModel); + } + private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); |