diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2018-12-10 01:38:41 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-12-10 01:38:41 +0000 |
commit | 0fe8d03a2f77d975809194f8399c979009d66fef (patch) | |
tree | 3ed3246169282cd65431358f7cfe1106444e3ffe /datafile-app-server/src/main | |
parent | 693026c6b973c44ade969516be496f966e4fae86 (diff) | |
parent | 33951a66970f3703d9a71e36adab8692f272817b (diff) |
Merge "Make DFC handle multiple messages from MR"
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java | 54 |
1 files changed, 29 insertions, 25 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e828776a..46c6e942 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser { * @return reactive Mono with an array of FileData */ public Flux<FileData> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getFileDataFromJsonArray(jsonElement); } private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { - return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - logger.trace("starting to getJsonObjectFromAnArray!"); - - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux<FileData> create(Mono<JsonObject> jsonObject) { - return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject)) - : transform(monoJsonP)); + private Flux<FileData> create(Flux<JsonObject> jsonObject) { + return jsonObject + .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transform(monoJsonP)); } private Flux<FileData> transform(JsonObject message) { @@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser { return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message)); + logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + return Flux.empty(); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. FileReady event has incorrect JsonObject")); + logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + return Flux.empty(); } private Optional<FileMetaData> getFileMetaData(JsonObject message) { @@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser { private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - if (arrayOfAdditionalFields.get(i) != null) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); - if (fileData.isPresent()) { - res.add(fileData.get()); - } + if (fileData.isPresent()) { + res.add(fileData.get()); } } return Flux.fromIterable(res); @@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser { private boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } + + private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } } |