From 33951a66970f3703d9a71e36adab8692f272817b Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Fri, 30 Nov 2018 10:06:54 +0100 Subject: Make DFC handle multiple messages from MR DMaaP MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC. Change-Id: I78c2fc7f4512a07fadf61c2cf1f6399d466fc873 Issue-ID: DCAEGEN2-1001 Signed-off-by: elinuxhenrik --- .../datafile/service/DmaapConsumerJsonParser.java | 54 ++++++++++++---------- 1 file changed, 29 insertions(+), 25 deletions(-) (limited to 'datafile-app-server/src/main/java') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e828776a..46c6e942 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser { * @return reactive Mono with an array of FileData */ public Flux getJsonObject(Mono rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getFileDataFromJsonArray(jsonElement); } private Flux getFileDataFromJsonArray(JsonElement jsonElement) { - return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional getJsonObjectFromAnArray(JsonElement element) { - logger.trace("starting to getJsonObjectFromAnArray!"); - - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux create(Mono jsonObject) { - return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject)) - : transform(monoJsonP)); + private Flux create(Flux jsonObject) { + return jsonObject + .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transform(monoJsonP)); } private Flux transform(JsonObject message) { @@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser { return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message)); + logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + return Flux.empty(); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. FileReady event has incorrect JsonObject")); + logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + return Flux.empty(); } private Optional getFileMetaData(JsonObject message) { @@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser { private Flux getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - if (arrayOfAdditionalFields.get(i) != null) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional fileData = getFileDataFromJson(fileMetaData, fileInfo); + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + Optional fileData = getFileDataFromJson(fileMetaData, fileInfo); - if (fileData.isPresent()) { - res.add(fileData.get()); - } + if (fileData.isPresent()) { + res.add(fileData.get()); } } return Flux.fromIterable(res); @@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser { private boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } + + private Flux logErrorAndReturnEmptyFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } } -- cgit 1.2.3-korg