summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java19
1 files changed, 15 insertions, 4 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6ae8a3c7..16a6f8c5 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -54,6 +54,7 @@ public class ScheduledTasks {
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
private final AaiProducerTask aaiProducerTask;
+ private final BbsActionsTask bbsActionsTask;
private Map<String, String> mdcContextMap;
/**
@@ -64,11 +65,16 @@ public class ScheduledTasks {
* @param aaiPublisherTask - second task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask,
- AaiProducerTask aaiPublisherTask, Map<String, String> mdcContextMap) {
+ public ScheduledTasks(
+ DmaapConsumerTask dmaapConsumerTask,
+ DmaapPublisherTask dmaapPublisherTask,
+ AaiProducerTask aaiPublisherTask,
+ BbsActionsTask bbsActionsTask,
+ Map<String, String> mdcContextMap) {
this.dmaapConsumerTask = dmaapConsumerTask;
this.dmaapProducerTask = dmaapPublisherTask;
this.aaiProducerTask = aaiPublisherTask;
+ this.bbsActionsTask = bbsActionsTask;
this.mdcContextMap = mdcContextMap;
}
@@ -88,6 +94,9 @@ public class ScheduledTasks {
.doOnError(exception ->
logger.warn("AAIProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
+ .flatMap(this::processAdditionalFields)
+ .doOnError(exception ->
+ logger.warn("BBSActionsTask exception has been registered: ", exception))
.flatMap(this::publishToDmaapConfiguration)
.doOnError(exception ->
logger.warn("DMaaPProducerTask exception has been registered: ", exception))
@@ -102,7 +111,6 @@ public class ScheduledTasks {
}
}
-
private void onComplete() {
logger.info("PRH tasks have been completed");
}
@@ -121,7 +129,6 @@ public class ScheduledTasks {
}
}
-
private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
@@ -148,6 +155,10 @@ public class ScheduledTasks {
}
}
+ private Mono<ConsumerDmaapModel> processAdditionalFields(ConsumerDmaapModel consumerDmaapModel) {
+ return bbsActionsTask.execute(consumerDmaapModel);
+ }
+
private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);