From 03cc930719ef77373dacfbb151f11f42fc01f0ea Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Mon, 3 Dec 2018 12:50:45 +0100 Subject: Make DFC handle multiple messages from MR DMaaP MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC. Change-Id: I3bd1f62d68351d7149b94c9d6331777d485a7d53 Issue-ID: DCAEGEN2-1001 Signed-off-by: elinuxhenrik --- datafile-app-server/pom.xml | 2 +- .../datafile/service/DmaapConsumerJsonParser.java | 121 +++++------ .../service/DmaapConsumerJsonParserTest.java | 238 +++++++++++++++++---- datafile-commons/pom.xml | 2 +- datafile-dmaap-client/pom.xml | 2 +- pom.xml | 2 +- version.properties | 2 +- 7 files changed, 257 insertions(+), 112 deletions(-) diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index 24267459..7e39ef05 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -24,7 +24,7 @@ org.onap.dcaegen2.collectors datafile - 1.0.4-SNAPSHOT + 1.0.5-SNAPSHOT org.onap.dcaegen2.collectors.datafile diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index cfd06db3..78c5e98d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.slf4j.Logger; @@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser { * @return reactive Mono with an array of FileData */ public Flux getJsonObject(Mono rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) + : getFileDataFromJsonArray(jsonElement); } private Flux getFileDataFromJsonArray(JsonElement jsonElement) { - return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional getJsonObjectFromAnArray(JsonElement element) { - logger.trace("starting to getJsonObjectFromAnArray!"); - - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux create(Mono jsonObject) { - return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + private Flux create(Flux jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transform(monoJsonP)); } - private Flux transform(JsonObject jsonObject) { - if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { - JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); - String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); - String changeType = getValueFromJson(notificationFields, CHANGE_TYPE); - String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); - JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); - if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) + private Flux transform(JsonObject message) { + JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); + String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); + String changeType = getValueFromJson(notificationFields, CHANGE_TYPE); + String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); + JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); + + if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) && arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); - } + return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); + } - if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Flux.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); - } else if (arrayOfNamedHashMap != null) { - return Flux.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); - } - return Flux.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { + logger.error("FileReady event header is missing information. {}", message); + } else if (arrayOfNamedHashMap != null) { + logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message); } - return Flux.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + return Flux.empty(); } private Flux getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - if (arrayOfAdditionalFields.get(i) != null) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType); - - if (fileData != null) { - res.add(fileData); - } else { - logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); - } + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType); + + if (fileData != null) { + res.add(fileData); + } else { + logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); } } return Flux.fromIterable(res); @@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { - fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + // @formatter:off + fileData = ImmutableFileData.builder() + .name(name) + .changeIdentifier(changeIdentifier) + .changeType(changeType) + .location(location) + .compression(compression) + .fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion) + .build(); + // @formatter:on } return fileData; } @@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType) - && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); + && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { @@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser { } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { - return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) && - isStringIsNotNullAndNotEmpty(compression); + return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) + && isStringIsNotNullAndNotEmpty(compression); } - private boolean containsHeader(JsonObject jsonObject) { + private boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) { - return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header); - } - private boolean isStringIsNotNullAndNotEmpty(String string) { return string != null && !string.isEmpty(); } + + private Flux logErrorAndReturnEmptyFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java index b5457b82..0c483008 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java @@ -49,17 +49,32 @@ class DmaapConsumerJsonParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) - .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION) + void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField).build(); - FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + FileData expectedFileData = ImmutableFileData.builder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -72,14 +87,92 @@ class DmaapConsumerJsonParserTest { .expectNext(expectedFileData).verifyComplete(); } + @Test + void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + FileData expectedFileData = ImmutableFileData.builder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[" + parsedString + "," + parsedString + "]"; + DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + FileData expectedFileData = ImmutableFileData.builder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[{\"event\":{}}," + parsedString + "]"; + DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).verifyComplete(); + } + @Test void whenPassingCorrectJsonWihoutName_noFileData() { - AdditionalField additionalField = - new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField).build(); + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -94,12 +187,20 @@ class DmaapConsumerJsonParserTest { @Test void whenPassingCorrectJsonWihoutLocation_noFileData() { - AdditionalField additionalField = - new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField).build(); + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -114,11 +215,20 @@ class DmaapConsumerJsonParserTest { @Test void whenPassingCorrectJsonWihoutCompression_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField).build(); + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -133,11 +243,20 @@ class DmaapConsumerJsonParserTest { @Test void whenPassingCorrectJsonWihoutFileFormatType_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) - .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField).build(); + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -152,18 +271,38 @@ class DmaapConsumerJsonParserTest { @Test void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() { - AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME) - .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build(); - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) - .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION) + // @formatter:off + AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalFaultyField) + .addAdditionalField(additionalField) .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build(); - FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + FileData expectedFileData = ImmutableFileData.builder() + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); @@ -177,9 +316,14 @@ class DmaapConsumerJsonParserTest { } @Test - void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() { - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES_INVALID") - .changeType("FileReady_INVALID").notificationFieldsVersion("1.0_INVALID").build(); + void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() { + // @formatter:off + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .changeIdentifier("PM_MEAS_FILES_INVALID") + .changeType("FileReady_INVALID") + .notificationFieldsVersion("1.0_INVALID") + .build(); + // @formatter:on String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); @@ -189,11 +333,11 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + .expectSubscription().verifyComplete(); } @Test - void whenPassingJsonWithNullJsonElement_validationThrowingAnException() { + void whenPassingJsonWithNullJsonElement_noFileData() { JsonMessage message = new JsonMessage.JsonMessageBuilder().build(); String incorrectMessageString = message.toString(); @@ -205,6 +349,6 @@ class DmaapConsumerJsonParserTest { .getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + .expectSubscription().verifyComplete(); } } diff --git a/datafile-commons/pom.xml b/datafile-commons/pom.xml index 8c6e2e6a..080c5f97 100644 --- a/datafile-commons/pom.xml +++ b/datafile-commons/pom.xml @@ -24,7 +24,7 @@ org.onap.dcaegen2.collectors datafile - 1.0.4-SNAPSHOT + 1.0.5-SNAPSHOT org.onap.dcaegen2.collectors.datafile diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml index 52394ad5..562eb29c 100644 --- a/datafile-dmaap-client/pom.xml +++ b/datafile-dmaap-client/pom.xml @@ -24,7 +24,7 @@ org.onap.dcaegen2.collectors datafile - 1.0.4-SNAPSHOT + 1.0.5-SNAPSHOT org.onap.dcaegen2.collectors.datafile diff --git a/pom.xml b/pom.xml index 5bd941f5..78429943 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ org.onap.dcaegen2.collectors datafile - 1.0.4-SNAPSHOT + 1.0.5-SNAPSHOT dcaegen2-collectors.datafile datafile collector diff --git a/version.properties b/version.properties index d4894448..65b11d19 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=0 -patch=4 +patch=5 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg