aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-09-25 12:24:48 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-10-08 07:50:46 +0200
commita122d0a0a7075163fad4865143fedf7b6fe511d1 (patch)
treead3034f4f5bdc72a620e2404228925202ba74abe /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
parentf245daa4b205846af33f7a8e088d203c39f24d52 (diff)
PRH DMaaP objects batching
*Getting collection of object in one request *Refator the workflow in the old implementation Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f Issue-ID: DCAEGEN2-834 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index de7837ec..08767428 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -83,7 +85,13 @@ public class ScheduledTasks {
logger.warn("Nothing to consume from DMaaP")
)
.flatMap(this::publishToAaiConfiguration)
+ .doOnError(exception ->
+ logger.warn("AAIProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.flatMap(this::publishToDmaapConfiguration)
+ .doOnError(exception ->
+ logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.doOnTerminate(mainCountDownLatch::countDown)
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -113,8 +121,8 @@ public class ScheduledTasks {
}
- private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
- return Mono.defer(() -> {
+ private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
logger.info(INVOKE, "Init configs");
@@ -138,4 +146,8 @@ public class ScheduledTasks {
return Mono.error(e);
}
}
+
+ private Predicate<Throwable> resumePrhPredicate() {
+ return exception -> exception instanceof PrhTaskException;
+ }
}