aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-09-25 12:24:48 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-10-08 07:50:46 +0200
commita122d0a0a7075163fad4865143fedf7b6fe511d1 (patch)
treead3034f4f5bdc72a620e2404228925202ba74abe /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
parentf245daa4b205846af33f7a8e088d203c39f24d52 (diff)
PRH DMaaP objects batching
*Getting collection of object in one request *Refator the workflow in the old implementation Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f Issue-ID: DCAEGEN2-834 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java28
1 files changed, 16 insertions, 12 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 1d121b38..aed99747 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .flatMap(this::getJsonParserMessage)
+ .flatMapMany(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
@@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser {
: Mono.fromCallable(() -> new JsonParser().parse(message));
}
- private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
+ ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
- private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
return create(
- Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) {
+ private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP ->
- !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP))
+ .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {