diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-09-25 12:24:48 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-10-08 07:50:46 +0200 |
commit | a122d0a0a7075163fad4865143fedf7b6fe511d1 (patch) | |
tree | ad3034f4f5bdc72a620e2404228925202ba74abe /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | |
parent | f245daa4b205846af33f7a8e088d203c39f24d52 (diff) |
PRH DMaaP objects batching
*Getting collection of object
in one request
*Refator the workflow
in the old implementation
Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f
Issue-ID: DCAEGEN2-834
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 1d121b38..aed99747 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { + public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .flatMap(this::getJsonParserMessage) + .flatMapMany(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } @@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser { : Mono.fromCallable(() -> new JsonParser().parse(message)); } - private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { + private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) + ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getConsumerDmaapModelFromJsonArray(jsonElement); } - private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { return create( - Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new))); + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) { + private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)) + .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { |