aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2018-12-10 01:38:41 +0000
committerGerrit Code Review <gerrit@onap.org>2018-12-10 01:38:41 +0000
commit0fe8d03a2f77d975809194f8399c979009d66fef (patch)
tree3ed3246169282cd65431358f7cfe1106444e3ffe /datafile-app-server/src/main/java
parent693026c6b973c44ade969516be496f966e4fae86 (diff)
parent33951a66970f3703d9a71e36adab8692f272817b (diff)
Merge "Make DFC handle multiple messages from MR"
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java54
1 files changed, 29 insertions, 25 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e828776a..46c6e942 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject
+ .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject message) {
@@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser {
return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+ logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ return Flux.empty();
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+ logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ return Flux.empty();
}
private Optional<FileMetaData> getFileMetaData(JsonObject message) {
@@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser {
private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
- if (fileData.isPresent()) {
- res.add(fileData.get());
- }
+ if (fileData.isPresent()) {
+ res.add(fileData.get());
}
}
return Flux.fromIterable(res);
@@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser {
private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}