summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2018-12-10 01:37:54 +0000
committerGerrit Code Review <gerrit@onap.org>2018-12-10 01:37:54 +0000
commit289448b5c2da86c7c9255e84ea62e0b703f6117d (patch)
treee1bcd8a3babf2a813ad7368273b5f7df53bd5df2 /datafile-app-server/src/main
parent67dd4223056a2c6a56ffef34076db7dc1a2740b0 (diff)
parent03cc930719ef77373dacfbb151f11f42fc01f0ea (diff)
Merge "Make DFC handle multiple messages from MR DMaaP" into casablanca3.0.1-ONAP1.0.5casablanca
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java121
1 files changed, 61 insertions, 60 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index cfd06db3..78c5e98d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.slf4j.Logger;
@@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
- private Flux<FileData> transform(JsonObject jsonObject) {
- if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
- JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
- String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
- String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
- String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
- JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
- if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
+ private Flux<FileData> transform(JsonObject message) {
+ JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
+ String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
+ String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
+ String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
+ JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
+
+ if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
&& arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
- }
+ return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
+ }
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ logger.error("FileReady event header is missing information. {}", message);
+ } else if (arrayOfNamedHashMap != null) {
+ logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message);
}
- return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ return Flux.empty();
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
-
- if (fileData != null) {
- res.add(fileData);
- } else {
- logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
- }
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
+
+ if (fileData != null) {
+ res.add(fileData);
+ } else {
+ logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
}
}
return Flux.fromIterable(res);
@@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
- fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ // @formatter:off
+ fileData = ImmutableFileData.builder()
+ .name(name)
+ .changeIdentifier(changeIdentifier)
+ .changeType(changeType)
+ .location(location)
+ .compression(compression)
+ .fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion)
+ .build();
+ // @formatter:on
}
return fileData;
}
@@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
- private boolean containsHeader(JsonObject jsonObject) {
+ private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
- return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
- }
-
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}