aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java28
1 files changed, 16 insertions, 12 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 1d121b38..aed99747 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .flatMap(this::getJsonParserMessage)
+ .flatMapMany(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
@@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser {
: Mono.fromCallable(() -> new JsonParser().parse(message));
}
- private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
+ ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
- private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
return create(
- Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) {
+ private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP ->
- !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP))
+ .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {