diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index de7837ec..08767428 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; @@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -83,7 +85,13 @@ public class ScheduledTasks { logger.warn("Nothing to consume from DMaaP") ) .flatMap(this::publishToAaiConfiguration) + .doOnError(exception -> + logger.warn("AAIProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .flatMap(this::publishToDmaapConfiguration) + .doOnError(exception -> + logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .doOnTerminate(mainCountDownLatch::countDown) .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -113,8 +121,8 @@ public class ScheduledTasks { } - private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { - return Mono.defer(() -> { + private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); logger.info(INVOKE, "Init configs"); @@ -138,4 +146,8 @@ public class ScheduledTasks { return Mono.error(e); } } + + private Predicate<Throwable> resumePrhPredicate() { + return exception -> exception instanceof PrhTaskException; + } } |