summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/pom.xml2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java121
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java238
3 files changed, 253 insertions, 108 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 24267459..7e39ef05 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.0.4-SNAPSHOT</version>
+ <version>1.0.5-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index cfd06db3..78c5e98d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.slf4j.Logger;
@@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
- private Flux<FileData> transform(JsonObject jsonObject) {
- if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
- JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
- String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
- String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
- String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
- JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
- if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
+ private Flux<FileData> transform(JsonObject message) {
+ JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
+ String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
+ String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
+ String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
+ JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
+
+ if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
&& arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
- }
+ return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
+ }
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ logger.error("FileReady event header is missing information. {}", message);
+ } else if (arrayOfNamedHashMap != null) {
+ logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message);
}
- return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ return Flux.empty();
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
-
- if (fileData != null) {
- res.add(fileData);
- } else {
- logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
- }
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
+
+ if (fileData != null) {
+ res.add(fileData);
+ } else {
+ logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
}
}
return Flux.fromIterable(res);
@@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
- fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ // @formatter:off
+ fileData = ImmutableFileData.builder()
+ .name(name)
+ .changeIdentifier(changeIdentifier)
+ .changeType(changeType)
+ .location(location)
+ .compression(compression)
+ .fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion)
+ .build();
+ // @formatter:on
}
return fileData;
}
@@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
- private boolean containsHeader(JsonObject jsonObject) {
+ private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
- return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
- }
-
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index b5457b82..0c483008 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -49,17 +49,32 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -73,13 +88,91 @@ class DmaapConsumerJsonParserTest {
}
@Test
+ void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[" + parsedString + "," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[{\"event\":{}}," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
void whenPassingCorrectJsonWihoutName_noFileData() {
- AdditionalField additionalField =
- new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -94,12 +187,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutLocation_noFileData() {
- AdditionalField additionalField =
- new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -114,11 +215,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutCompression_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -133,11 +243,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -152,18 +271,38 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
- .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ // @formatter:off
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalFaultyField)
+ .addAdditionalField(additionalField)
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -177,9 +316,14 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID").notificationFieldsVersion("1.0_INVALID").build();
+ void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
+ // @formatter:off
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier("PM_MEAS_FILES_INVALID")
+ .changeType("FileReady_INVALID")
+ .notificationFieldsVersion("1.0_INVALID")
+ .build();
+ // @formatter:on
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
@@ -189,11 +333,11 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().verifyComplete();
}
@Test
- void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
+ void whenPassingJsonWithNullJsonElement_noFileData() {
JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
String incorrectMessageString = message.toString();
@@ -205,6 +349,6 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().verifyComplete();
}
}