From 03cc930719ef77373dacfbb151f11f42fc01f0ea Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Mon, 3 Dec 2018 12:50:45 +0100 Subject: Make DFC handle multiple messages from MR DMaaP MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC. Change-Id: I3bd1f62d68351d7149b94c9d6331777d485a7d53 Issue-ID: DCAEGEN2-1001 Signed-off-by: elinuxhenrik --- .../datafile/service/DmaapConsumerJsonParser.java | 121 +++++++++++---------- 1 file changed, 61 insertions(+), 60 deletions(-) (limited to 'datafile-app-server/src/main') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index cfd06db3..78c5e98d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.slf4j.Logger; @@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser { * @return reactive Mono with an array of FileData */ public Flux getJsonObject(Mono rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) + : getFileDataFromJsonArray(jsonElement); } private Flux getFileDataFromJsonArray(JsonElement jsonElement) { - return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional getJsonObjectFromAnArray(JsonElement element) { - logger.trace("starting to getJsonObjectFromAnArray!"); - - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux create(Mono jsonObject) { - return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + private Flux create(Flux jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transform(monoJsonP)); } - private Flux transform(JsonObject jsonObject) { - if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { - JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); - String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); - String changeType = getValueFromJson(notificationFields, CHANGE_TYPE); - String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); - JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); - if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) + private Flux transform(JsonObject message) { + JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); + String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); + String changeType = getValueFromJson(notificationFields, CHANGE_TYPE); + String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); + JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); + + if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) && arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); - } + return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); + } - if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Flux.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); - } else if (arrayOfNamedHashMap != null) { - return Flux.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); - } - return Flux.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { + logger.error("FileReady event header is missing information. {}", message); + } else if (arrayOfNamedHashMap != null) { + logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message); } - return Flux.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + return Flux.empty(); } private Flux getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - if (arrayOfAdditionalFields.get(i) != null) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType); - - if (fileData != null) { - res.add(fileData); - } else { - logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); - } + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType); + + if (fileData != null) { + res.add(fileData); + } else { + logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); } } return Flux.fromIterable(res); @@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { - fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + // @formatter:off + fileData = ImmutableFileData.builder() + .name(name) + .changeIdentifier(changeIdentifier) + .changeType(changeType) + .location(location) + .compression(compression) + .fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion) + .build(); + // @formatter:on } return fileData; } @@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType) - && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); + && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { @@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser { } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { - return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) && - isStringIsNotNullAndNotEmpty(compression); + return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) + && isStringIsNotNullAndNotEmpty(compression); } - private boolean containsHeader(JsonObject jsonObject) { + private boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) { - return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header); - } - private boolean isStringIsNotNullAndNotEmpty(String string) { return string != null && !string.isEmpty(); } + + private Flux logErrorAndReturnEmptyFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } } -- cgit 1.2.3-korg