diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | 128 |
1 files changed, 83 insertions, 45 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 4749b520..11939b53 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,32 +23,57 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; +import io.vavr.collection.List; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ +@Component public class DmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); - private static final String EVENT = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields"; - private static final String OAM_IPV_4_ADDRESS = "oamV4IpAddress"; - private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; - private static final String SOURCE_NAME = "sourceName"; - private static final String CORRELATION_ID = "correlationId"; + private String pnfSourceName; + private String pnfOamIpv4Address; + private String pnfOamIpv6Address; + private String pnfSerialNumberOptionalField; + private String pnfEquipVendorOptionalField; + private String pnfEquipModelOptionalField; + private String pnfEquipTypeOptionalField; + private String pnfNfRoleOptionalField; + private String pnfSwVersionOptionalField; + private JsonObject pnfAdditionalFields; /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. @@ -56,31 +81,24 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { - return monoMessage - .flatMapMany(this::getJsonParserMessage) - .flatMap(this::createJsonConsumerModel); + public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) { + return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } - private Mono<JsonElement> getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty") - : Mono.fromCallable(() -> new JsonParser().parse(message)); - } + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(List<JsonElement> items) { + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); - private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() - ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getConsumerDmaapModelFromJsonArray(jsonElement); - } - - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + if (items.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); + } return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); @@ -94,43 +112,63 @@ public class DmaapConsumerJsonParser { } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { - JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) .getAsJsonObject(PNF_REGISTRATION_FIELDS); - String pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); - String pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); - String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); - - return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address)) + this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); + this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); + this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); + this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); + this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER); + this.pnfEquipVendorOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_VENDOR); + this.pnfEquipModelOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_MODEL); + this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE); + this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION); + this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); + + return (StringUtils.isEmpty(pnfSourceName)) ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) : + + printMessage()) : Mono.just(ImmutableConsumerDmaapModel.builder() .correlationId(pnfSourceName) .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address).build()); + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : ""; } - private boolean ipPropertiesNotEmpty(String ipv4, String ipv6) { - return (!StringUtils.isEmpty(ipv4)) || !(StringUtils.isEmpty(ipv6)); - } - private boolean containsHeader(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); } - private String printMessage(String sourceName, String oamIpv4Address, String oamIpv6Address) { + private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + "\": \"%s\"," - + "\"" + OAM_IPV_4_ADDRESS + "\": \"%s\"," - + "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\"" - + "%n}", sourceName, oamIpv4Address, oamIpv6Address); + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + ); } private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) { |