diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | 67 |
1 files changed, 28 insertions, 39 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 43d6922a..b3d84562 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -20,6 +20,23 @@ package org.onap.dcaegen2.services.prh.service; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.stream.StreamSupport; + import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; @@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 @@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) { - return monoMessage - .flatMapMany(this::getConsumerDmaapModelFromJsonArray); + public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) { + return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { - LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement); - - if (jsonElement instanceof JsonObject) { - LOGGER.debug("Element is JsonObject"); - return create(Flux.just((JsonObject) jsonElement)); - } + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) { + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); - if (jsonElement instanceof JsonArray) { - LOGGER.debug("Element is JsonArray"); - JsonArray jsonArray = (JsonArray) jsonElement; - if (jsonArray.size() == 0) { - LOGGER.debug("Nothing to consume from DMaaP"); - return Flux.empty(); - } - return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + if (items.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); } - - LOGGER.warn("Element is neither JSON Object or Array"); - return Flux.empty(); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { |