aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-09-25 12:24:48 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-10-08 07:50:46 +0200
commita122d0a0a7075163fad4865143fedf7b6fe511d1 (patch)
treead3034f4f5bdc72a620e2404228925202ba74abe /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
parentf245daa4b205846af33f7a8e088d203c39f24d52 (diff)
PRH DMaaP objects batching
*Getting collection of object in one request *Refator the workflow in the old implementation Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f Issue-ID: DCAEGEN2-834 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
1 files changed, 3 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index a6baf4a5..4cde2257 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -32,7 +33,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
+ abstract Flux<ConsumerDmaapModel> consume(Mono<String> message);
abstract DMaaPConsumerReactiveHttpClient resolveClient();
@@ -40,7 +41,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(String object);
+ protected abstract Flux<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
return new DMaaPReactiveWebClient().build();