aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index de7837ec..08767428 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -83,7 +85,13 @@ public class ScheduledTasks {
logger.warn("Nothing to consume from DMaaP")
)
.flatMap(this::publishToAaiConfiguration)
+ .doOnError(exception ->
+ logger.warn("AAIProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.flatMap(this::publishToDmaapConfiguration)
+ .doOnError(exception ->
+ logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.doOnTerminate(mainCountDownLatch::countDown)
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -113,8 +121,8 @@ public class ScheduledTasks {
}
- private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
- return Mono.defer(() -> {
+ private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
logger.info(INVOKE, "Init configs");
@@ -138,4 +146,8 @@ public class ScheduledTasks {
return Mono.error(e);
}
}
+
+ private Predicate<Throwable> resumePrhPredicate() {
+ return exception -> exception instanceof PrhTaskException;
+ }
}