summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java71
1 files changed, 47 insertions, 24 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index cfd06db3..29885f99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -60,6 +60,9 @@ public class DmaapConsumerJsonParser {
private static final String FILE_FORMAT_TYPE = "fileFormatType";
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
+ private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+ private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
/**
* Extract info from string and create @see {@link FileData}.
*
@@ -73,17 +76,17 @@ public class DmaapConsumerJsonParser {
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -94,8 +97,8 @@ public class DmaapConsumerJsonParser {
private Flux<FileData> create(Mono<JsonObject> jsonObject) {
return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject jsonObject) {
@@ -106,26 +109,28 @@ public class DmaapConsumerJsonParser {
String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
- && arrayOfNamedHashMap != null) {
+ && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier)
+ && isChangeTypeCorrect(changeType)) {
return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
}
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap,
+ jsonObject);
}
return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ }
+
+ private boolean isChangeTypeCorrect(String changeType) {
+ return FILE_READY_CHANGE_TYPE.equals(changeType);
+ }
+
+ private boolean isChangeIdentifierCorrect(String changeIdentifier) {
+ return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
if (arrayOfAdditionalFields.get(i) != null) {
@@ -155,10 +160,10 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ .location(location).compression(compression).fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion).build();
}
return fileData;
}
@@ -168,9 +173,9 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,8 +183,8 @@ public class DmaapConsumerJsonParser {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
private boolean containsHeader(JsonObject jsonObject) {
@@ -193,4 +198,22 @@ public class DmaapConsumerJsonParser {
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion,
+ JsonArray arrayOfNamedHashMap, JsonObject jsonObject) {
+ String errorMessage = "FileReady event information is incomplete or incorrect!\n";
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ errorMessage += "header is missing.\n";
+ }
+ if (arrayOfNamedHashMap == null) {
+ errorMessage += "arrayOfNamedHashMap is missing.\n";
+ }
+ if (!isChangeIdentifierCorrect(changeIdentifier)) {
+ errorMessage += "changeIdentifier is incorrect.\n";
+ }
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += "changeType is incorrect.\n";
+ }
+ return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject));
+ }
}