diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2018-12-10 01:38:41 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-12-10 01:38:41 +0000 |
commit | 0fe8d03a2f77d975809194f8399c979009d66fef (patch) | |
tree | 3ed3246169282cd65431358f7cfe1106444e3ffe | |
parent | 693026c6b973c44ade969516be496f966e4fae86 (diff) | |
parent | 33951a66970f3703d9a71e36adab8692f272817b (diff) |
Merge "Make DFC handle multiple messages from MR"
2 files changed, 154 insertions, 42 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e828776a..46c6e942 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser { * @return reactive Mono with an array of FileData */ public Flux<FileData> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getFileDataFromJsonArray(jsonElement); } private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { - return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - logger.trace("starting to getJsonObjectFromAnArray!"); - - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux<FileData> create(Mono<JsonObject> jsonObject) { - return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject)) - : transform(monoJsonP)); + private Flux<FileData> create(Flux<JsonObject> jsonObject) { + return jsonObject + .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transform(monoJsonP)); } private Flux<FileData> transform(JsonObject message) { @@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser { return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message)); + logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + return Flux.empty(); } - return Flux.error(new DmaapNotFoundException( - "Unable to collect file from xNF. FileReady event has incorrect JsonObject")); + logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + return Flux.empty(); } private Optional<FileMetaData> getFileMetaData(JsonObject message) { @@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser { private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - if (arrayOfAdditionalFields.get(i) != null) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); - if (fileData.isPresent()) { - res.add(fileData.get()); - } + if (fileData.isPresent()) { + res.add(fileData.get()); } } return Flux.fromIterable(res); @@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser { private boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } + + private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java index a9bc546f..0ae9ece4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java @@ -60,7 +60,7 @@ class DmaapConsumerJsonParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException { + void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -108,7 +108,98 @@ class DmaapConsumerJsonParserTest { } @Test - void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() { + void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + FileMetaData fileMetaData = ImmutableFileMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .fileMetaData(fileMetaData) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[" + parsedString + "," + parsedString + "]"; + DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() + throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + FileMetaData fileMetaData = ImmutableFileMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .fileMetaData(fileMetaData) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[{\"event\":{}}," + parsedString + "]"; + DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithFaultyEventName_noFileData() { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .location(LOCATION) @@ -132,7 +223,7 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectError(DmaapNotFoundException.class).verify(); + .expectComplete().verify(); } @Test @@ -164,6 +255,27 @@ class DmaapConsumerJsonParserTest { } @Test + void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() { + // @formatter:off + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNextCount(0).verifyComplete(); + } + + @Test void whenPassingCorrectJsonWithoutLocation_noFileData() { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() @@ -303,7 +415,7 @@ class DmaapConsumerJsonParserTest { } @Test - void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() { + void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() { // @formatter:off JsonMessage message = new JsonMessage.JsonMessageBuilder() .eventName(NR_RADIO_ERICSSON_EVENT_NAME) @@ -320,27 +432,23 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + .expectSubscription().expectComplete().verify(); } @Test - void whenPassingJsonWithNullJsonElement_validationThrowingAnException() { - JsonMessage message = new JsonMessage.JsonMessageBuilder().build(); - - String incorrectMessageString = message.toString(); - String parsedString = message.getParsed(); + void whenPassingJsonWithNullJsonElement_noFileData() { DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = new JsonParser().parse("{}"); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription() + .expectComplete().verify(); } @Test - void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() { + void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -364,11 +472,11 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectError(DmaapNotFoundException.class).verify(); + .expectNextCount(0).expectComplete().verify(); } @Test - void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() { + void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -392,6 +500,6 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectError(DmaapNotFoundException.class).verify(); + .expectComplete().verify(); } } |