aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-26 15:15:03 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-07 08:21:02 +0200
commit8b1502fb0f1af5d00ec26e712e57b792fbd16bd8 (patch)
tree7cc80c278f17710863e6d865df77c5edfa6d4fbc /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
parente17c2d89d0470501fa60ed487726b0bbf3305f8c (diff)
Added dmaapReactiveConsumer
*Tests have not been ready yet Change-Id: I2e1d9c4218f91ae2f066b28acdbaa1870d7d27e7 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java36
1 files changed, 20 insertions, 16 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index cf096b7b..37b8686e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -55,7 +55,7 @@ public class ScheduledTasks {
Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
.doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .flatMap(this::publishToAAIConfiguration)
+ .map(this::publishToAAIConfiguration)
.flatMap(this::publishToDMaaPConfiguration)
.subscribeOn(Schedulers.elastic());
@@ -76,7 +76,7 @@ public class ScheduledTasks {
}
}
- private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -84,21 +84,25 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
- try {
- return Mono.just(aaiProducerTask.execute(dmaapModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in A&AIProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ return monoDMaaPModel.flatMap(dmaapModel -> {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
- private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
- try {
- return Mono.just(dmaapProducerTask.execute(aaiModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+ return monoAAIModel.flatMap(aaiModel -> {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
}