aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java5
1 files changed, 3 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 341a229b..3a5f213c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -54,12 +55,12 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
@Override
- Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ Flux<ConsumerDmaapModel> consume(Mono<String> message) {
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
- public Mono<ConsumerDmaapModel> execute(String object) {
+ public Flux<ConsumerDmaapModel> execute(String object) {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());