summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-11-30 10:06:54 +0100
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-12-03 12:59:46 +0100
commit33951a66970f3703d9a71e36adab8692f272817b (patch)
tree3c71fd949aaa95df5adb280742b49ec5657a70cc
parenta3c452af58c12283d76019509dd605f67f14532c (diff)
Make DFC handle multiple messages from MR
DMaaP MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC. Change-Id: I78c2fc7f4512a07fadf61c2cf1f6399d466fc873 Issue-ID: DCAEGEN2-1001 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java54
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java142
2 files changed, 154 insertions, 42 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e828776a..46c6e942 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject
+ .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject message) {
@@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser {
return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+ logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ return Flux.empty();
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+ logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ return Flux.empty();
}
private Optional<FileMetaData> getFileMetaData(JsonObject message) {
@@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser {
private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
- if (fileData.isPresent()) {
- res.add(fileData.get());
- }
+ if (fileData.isPresent()) {
+ res.add(fileData.get());
}
}
return Flux.fromIterable(res);
@@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser {
private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index a9bc546f..0ae9ece4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -60,7 +60,7 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -108,7 +108,98 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[" + parsedString + "," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
+ throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[{\"event\":{}}," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.location(LOCATION)
@@ -132,7 +223,7 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
@Test
@@ -164,6 +255,27 @@ class DmaapConsumerJsonParserTest {
}
@Test
+ void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
+ // @formatter:off
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
+ }
+
+ @Test
void whenPassingCorrectJsonWithoutLocation_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
@@ -303,7 +415,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+ void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
// @formatter:off
JsonMessage message = new JsonMessage.JsonMessageBuilder()
.eventName(NR_RADIO_ERICSSON_EVENT_NAME)
@@ -320,27 +432,23 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().expectComplete().verify();
}
@Test
- void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
- JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
-
- String incorrectMessageString = message.toString();
- String parsedString = message.getParsed();
+ void whenPassingJsonWithNullJsonElement_noFileData() {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = new JsonParser().parse("{}");
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ .expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -364,11 +472,11 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectNextCount(0).expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -392,6 +500,6 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
}