diff options
Diffstat (limited to 'prh-app-server/src/main/java')
4 files changed, 36 insertions, 18 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 1d121b38..aed99747 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { + public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .flatMap(this::getJsonParserMessage) + .flatMapMany(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } @@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser { : Mono.fromCallable(() -> new JsonParser().parse(message)); } - private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { + private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) + ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getConsumerDmaapModelFromJsonArray(jsonElement); } - private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { return create( - Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new))); + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) { + private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)) + .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a6baf4a5..4cde2257 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -32,7 +33,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message); + abstract Flux<ConsumerDmaapModel> consume(Mono<String> message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -40,7 +41,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object); + protected abstract Flux<ConsumerDmaapModel> execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 341a229b..3a5f213c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -54,12 +55,12 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } @Override - Mono<ConsumerDmaapModel> consume(Mono<String> message) { + Flux<ConsumerDmaapModel> consume(Mono<String> message) { return dmaapConsumerJsonParser.getJsonObject(message); } @Override - public Mono<ConsumerDmaapModel> execute(String object) { + public Flux<ConsumerDmaapModel> execute(String object) { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index de7837ec..08767428 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; @@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -83,7 +85,13 @@ public class ScheduledTasks { logger.warn("Nothing to consume from DMaaP") ) .flatMap(this::publishToAaiConfiguration) + .doOnError(exception -> + logger.warn("AAIProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .flatMap(this::publishToDmaapConfiguration) + .doOnError(exception -> + logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .doOnTerminate(mainCountDownLatch::countDown) .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -113,8 +121,8 @@ public class ScheduledTasks { } - private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { - return Mono.defer(() -> { + private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); logger.info(INVOKE, "Init configs"); @@ -138,4 +146,8 @@ public class ScheduledTasks { return Mono.error(e); } } + + private Predicate<Throwable> resumePrhPredicate() { + return exception -> exception instanceof PrhTaskException; + } } |