From fa56715424808348f2c3060dbbc2576e6950b774 Mon Sep 17 00:00:00 2001 From: Marcin Migdal Date: Wed, 27 Mar 2019 14:26:01 +0100 Subject: Support "Empty" from DMaap Simulator Change-Id: I1444071356cb8fb8a3393940bb004d6491d02324 Issue-ID: DCAEGEN2-1361 Signed-off-by: Marcin Migdal --- .../prh/service/DmaapConsumerJsonParser.java | 139 +++++++++++---------- 1 file changed, 72 insertions(+), 67 deletions(-) diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 531a8e5f..319b3511 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -20,26 +20,10 @@ package org.onap.dcaegen2.services.prh.service; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.Optional; -import java.util.stream.StreamSupport; - +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; @@ -52,7 +36,22 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_R import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import java.util.Optional; +import java.util.stream.StreamSupport; +import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** @@ -81,50 +80,56 @@ public class DmaapConsumerJsonParser { */ public Flux getJsonObject(Mono monoMessage) { return monoMessage - .flatMapMany(this::getConsumerDmaapModelFromJsonArray); + .flatMapMany(this::getConsumerDmaapModelFromJsonArray); } private Flux getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { - - LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}",jsonElement); - - if(jsonElement instanceof JsonPrimitive) { - LOGGER.debug("Response from DMaaP is Json primitive {}", jsonElement); + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement); + if (jsonElement instanceof JsonPrimitive) { + LOGGER.debug("Response from DMaaP is Json primitive"); return Flux.empty(); } - if(jsonElement instanceof JsonObject) + if (jsonElement instanceof JsonObject) { + LOGGER.debug("Response from DMaaP is JsonObject"); return create(Flux.just((JsonObject) jsonElement)); - - JsonArray jsonArray = (JsonArray) jsonElement; - if(jsonArray.size() == 0) { - LOGGER.debug("Nothing to consume from DMaaP"); - return Flux.empty(); } - return create( + + if (jsonElement instanceof JsonArray) { + LOGGER.debug("Response from DMaaP is JsonArray"); + JsonArray jsonArray = (JsonArray) jsonElement; + if (jsonArray.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); + } + return create( Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); + } + + LOGGER.debug("DmaapConsumerJsonParser input object type not recognized "); + return Flux.empty(); } public Optional getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } private Flux create(Flux jsonObject) { return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") - : transform(monoJsonP)) - .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); + !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") + : transform(monoJsonP)) + .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); } private Mono transform(JsonObject responseFromDmaap) { JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(COMMON_EVENT_HEADER); + .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(PNF_REGISTRATION_FIELDS); + .getAsJsonObject(PNF_REGISTRATION_FIELDS); this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); @@ -138,19 +143,19 @@ public class DmaapConsumerJsonParser { this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); return (StringUtils.isEmpty(pnfSourceName)) - ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage()) : - Mono.just(ImmutableConsumerDmaapModel.builder() - .correlationId(pnfSourceName) - .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address) - .serialNumber(pnfSerialNumberOptionalField) - .equipVendor(pnfEquipVendorOptionalField) - .equipModel(pnfEquipModelOptionalField) - .equipType(pnfEquipTypeOptionalField) - .nfRole(pnfNfRoleOptionalField) - .swVersion(pnfSwVersionOptionalField) - .additionalFields(pnfAdditionalFields).build()); + ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " + + printMessage()) : + Mono.just(ImmutableConsumerDmaapModel.builder() + .correlationId(pnfSourceName) + .ipv4(pnfOamIpv4Address) + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { @@ -163,20 +168,20 @@ public class DmaapConsumerJsonParser { private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," - + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT - + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, - this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, - this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, - this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields ); } -- cgit 1.2.3-korg