diff options
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java | 71 |
1 files changed, 47 insertions, 24 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index cfd06db3..29885f99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -60,6 +60,9 @@ public class DmaapConsumerJsonParser { private static final String FILE_FORMAT_TYPE = "fileFormatType"; private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; + private static final String FILE_READY_CHANGE_TYPE = "FileReady"; + private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + /** * Extract info from string and create @see {@link FileData}. * @@ -73,17 +76,17 @@ public class DmaapConsumerJsonParser { private Mono<JsonElement> getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + : getFileDataFromJsonArray(jsonElement); } private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -94,8 +97,8 @@ public class DmaapConsumerJsonParser { private Flux<FileData> create(Mono<JsonObject> jsonObject) { return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); } private Flux<FileData> transform(JsonObject jsonObject) { @@ -106,26 +109,28 @@ public class DmaapConsumerJsonParser { String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) - && arrayOfNamedHashMap != null) { + && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier) + && isChangeTypeCorrect(changeType)) { return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); } - if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Flux.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); - } else if (arrayOfNamedHashMap != null) { - return Flux.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); - } - return Flux.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap, + jsonObject); } return Flux.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + } + + private boolean isChangeTypeCorrect(String changeType) { + return FILE_READY_CHANGE_TYPE.equals(changeType); + } + + private boolean isChangeIdentifierCorrect(String changeIdentifier) { + return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { if (arrayOfAdditionalFields.get(i) != null) { @@ -155,10 +160,10 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build(); } return fileData; } @@ -168,9 +173,9 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType) - && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); + && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { @@ -178,8 +183,8 @@ public class DmaapConsumerJsonParser { } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { - return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) && - isStringIsNotNullAndNotEmpty(compression); + return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) + && isStringIsNotNullAndNotEmpty(compression); } private boolean containsHeader(JsonObject jsonObject) { @@ -193,4 +198,22 @@ public class DmaapConsumerJsonParser { private boolean isStringIsNotNullAndNotEmpty(String string) { return string != null && !string.isEmpty(); } + + private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion, + JsonArray arrayOfNamedHashMap, JsonObject jsonObject) { + String errorMessage = "FileReady event information is incomplete or incorrect!\n"; + if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { + errorMessage += "header is missing.\n"; + } + if (arrayOfNamedHashMap == null) { + errorMessage += "arrayOfNamedHashMap is missing.\n"; + } + if (!isChangeIdentifierCorrect(changeIdentifier)) { + errorMessage += "changeIdentifier is incorrect.\n"; + } + if (!isChangeTypeCorrect(changeType)) { + errorMessage += "changeType is incorrect.\n"; + } + return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject)); + } } |