diff options
author | sangeeta.bellara <sangeeta.bellara@t-systems.com> | 2023-03-09 22:13:03 +0530 |
---|---|---|
committer | Sangeeta Bellara <sangeeta.bellara@t-systems.com> | 2023-03-28 06:26:20 +0000 |
commit | e1e4154a3914f0878e26ea2e1683ebdfe507f5a9 (patch) | |
tree | 6dc21739b666f486daab905de63f2ca5e1aa72d9 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | |
parent | fb93a46a63c859434cecdbd07c0e08531c366855 (diff) |
PRH Code Additions for Early PNF registrations1.9.0
Issue-ID: DCAEGEN2-3312
Change-Id: Id9b1ca83390af3675e26fc61ccc8d12611ab8ddf
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Change-Id: I9bc25bc1343c40aca5644de3fd30f7c2142c1a47
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | 173 |
1 files changed, 114 insertions, 59 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index f98e952f..25c380fb 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,43 +18,48 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.service; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.collection.List; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.configurationprocessor.json.JSONArray; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.boot.configurationprocessor.json.JSONObject; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; + + + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 @@ -74,6 +80,8 @@ public class DmaapConsumerJsonParser { private String pnfSwVersionOptionalField; private JsonObject pnfAdditionalFields; + private String sourceName; + /** * Extract info from string and create @see {@link ConsumerDmaapModel}. * @@ -84,6 +92,11 @@ public class DmaapConsumerJsonParser { return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } + public JSONObject getJsonObjectKafka(String jsonStr) throws JSONException { + return new JSONObject(jsonStr); + } + + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(List<JsonElement> items) { LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); @@ -97,25 +110,59 @@ public class DmaapConsumerJsonParser { .orElseGet(JsonObject::new))))); } + /** + * Extract info from string and create @see {@link ConsumerDmaapModel}. + * + * @param monoMessage - results from Kafka + * @return reactive DMaaPModel + * + */ + /** + * @author <a href="mailto:shilpa.urade@t-systems.com">Shilpa Urade</a> on 13/3/23 + */ + + public Flux<ConsumerDmaapModel> getConsumerDmaapModelFromKafkaConsumerRecord(java.util.List<String> items) + { + LOGGER.info("DmaapConsumerJsonParser input for parsing: {} with commit", items); + if (items.size() == 0) { + LOGGER.info("Nothing to consume from Kafka"); + return Flux.empty(); + } + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonObjectFromString -> getJsonObjectFromString(jsonObjectFromString) + .orElseGet(JsonObject::new))))); + } + + Optional<JsonObject> getJsonObjectFromString(String element) { + return Optional.ofNullable(JsonParser.parseString(element).getAsJsonObject()); + } + + public String getSourceName() { + return sourceName; + } + Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } + + Optional<JsonObject> getJsonObjectFromKafkaRecords(String element) { + return Optional.ofNullable(new JsonObject().getAsJsonObject(element)); } + private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") - : transform(monoJsonP)) - .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); + return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") + : transform(monoJsonP)); } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(COMMON_EVENT_HEADER); + .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(PNF_REGISTRATION_FIELDS); - + .getAsJsonObject(PNF_REGISTRATION_FIELDS); this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); @@ -126,21 +173,20 @@ public class DmaapConsumerJsonParser { this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE); this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION); this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); - return (StringUtils.isEmpty(pnfSourceName)) - ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage()) : - Mono.just(ImmutableConsumerDmaapModel.builder() - .correlationId(pnfSourceName) - .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address) - .serialNumber(pnfSerialNumberOptionalField) - .equipVendor(pnfEquipVendorOptionalField) - .equipModel(pnfEquipModelOptionalField) - .equipType(pnfEquipTypeOptionalField) - .nfRole(pnfNfRoleOptionalField) - .swVersion(pnfSwVersionOptionalField) - .additionalFields(pnfAdditionalFields).build()); + ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " + + printMessage()) : + Mono.just(ImmutableConsumerDmaapModel.builder() + .correlationId(pnfSourceName) + .ipv4(pnfOamIpv4Address) + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { @@ -148,30 +194,39 @@ public class DmaapConsumerJsonParser { } private boolean containsHeader(JsonObject jsonObject) { - return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + try { + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + }catch(Exception e){ + LOGGER.info("Fetching an error in containsHeader method {}",e.getMessage()); + } + return false; } private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," - + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT - + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, - this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, - this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, - this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields ); } private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) { - LOGGER.warn(messageForLogger); + LOGGER.info(messageForLogger); return Mono.empty(); } + + public JSONArray getJsonArray(String value) throws JSONException { + return new JSONArray(value); + } } |