summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-12-03 12:50:45 +0100
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-12-03 12:55:53 +0100
commit03cc930719ef77373dacfbb151f11f42fc01f0ea (patch)
tree9bb2dd825a870d54d40dde53c2a307d028919941
parent7feb08ddbc51f53a5b0b3b1613940d09f7bc601f (diff)
Make DFC handle multiple messages from MR DMaaP
MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC. Change-Id: I3bd1f62d68351d7149b94c9d6331777d485a7d53 Issue-ID: DCAEGEN2-1001 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
-rw-r--r--datafile-app-server/pom.xml2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java121
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java238
-rw-r--r--datafile-commons/pom.xml2
-rw-r--r--datafile-dmaap-client/pom.xml2
-rw-r--r--pom.xml2
-rw-r--r--version.properties2
7 files changed, 257 insertions, 112 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 24267459..7e39ef05 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.0.4-SNAPSHOT</version>
+ <version>1.0.5-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index cfd06db3..78c5e98d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.slf4j.Logger;
@@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
- private Flux<FileData> transform(JsonObject jsonObject) {
- if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
- JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
- String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
- String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
- String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
- JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
- if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
+ private Flux<FileData> transform(JsonObject message) {
+ JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
+ String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
+ String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
+ String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
+ JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
+
+ if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
&& arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
- }
+ return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
+ }
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ logger.error("FileReady event header is missing information. {}", message);
+ } else if (arrayOfNamedHashMap != null) {
+ logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message);
}
- return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ return Flux.empty();
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
-
- if (fileData != null) {
- res.add(fileData);
- } else {
- logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
- }
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
+
+ if (fileData != null) {
+ res.add(fileData);
+ } else {
+ logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
}
}
return Flux.fromIterable(res);
@@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
- fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ // @formatter:off
+ fileData = ImmutableFileData.builder()
+ .name(name)
+ .changeIdentifier(changeIdentifier)
+ .changeType(changeType)
+ .location(location)
+ .compression(compression)
+ .fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion)
+ .build();
+ // @formatter:on
}
return fileData;
}
@@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
- private boolean containsHeader(JsonObject jsonObject) {
+ private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
- return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
- }
-
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index b5457b82..0c483008 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -49,17 +49,32 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -73,13 +88,91 @@ class DmaapConsumerJsonParserTest {
}
@Test
+ void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[" + parsedString + "," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[{\"event\":{}}," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
void whenPassingCorrectJsonWihoutName_noFileData() {
- AdditionalField additionalField =
- new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -94,12 +187,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutLocation_noFileData() {
- AdditionalField additionalField =
- new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -114,11 +215,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutCompression_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -133,11 +243,20 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField).build();
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -152,18 +271,38 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
- .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
- .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ // @formatter:off
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalFaultyField)
+ .addAdditionalField(additionalField)
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -177,9 +316,14 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID").notificationFieldsVersion("1.0_INVALID").build();
+ void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
+ // @formatter:off
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .changeIdentifier("PM_MEAS_FILES_INVALID")
+ .changeType("FileReady_INVALID")
+ .notificationFieldsVersion("1.0_INVALID")
+ .build();
+ // @formatter:on
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
@@ -189,11 +333,11 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().verifyComplete();
}
@Test
- void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
+ void whenPassingJsonWithNullJsonElement_noFileData() {
JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
String incorrectMessageString = message.toString();
@@ -205,6 +349,6 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().verifyComplete();
}
}
diff --git a/datafile-commons/pom.xml b/datafile-commons/pom.xml
index 8c6e2e6a..080c5f97 100644
--- a/datafile-commons/pom.xml
+++ b/datafile-commons/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.0.4-SNAPSHOT</version>
+ <version>1.0.5-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml
index 52394ad5..562eb29c 100644
--- a/datafile-dmaap-client/pom.xml
+++ b/datafile-dmaap-client/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.0.4-SNAPSHOT</version>
+ <version>1.0.5-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/pom.xml b/pom.xml
index 5bd941f5..78429943 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.0.4-SNAPSHOT</version>
+ <version>1.0.5-SNAPSHOT</version>
<name>dcaegen2-collectors.datafile</name>
<description>datafile collector</description>
diff --git a/version.properties b/version.properties
index d4894448..65b11d19 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=1
minor=0
-patch=4
+patch=5
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT