summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/config/application.yaml1
-rw-r--r--datafile-app-server/pom.xml2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java12
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java142
5 files changed, 166 insertions, 45 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index cef185c6..b66f7b6e 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -14,6 +14,7 @@ logging:
ROOT: ERROR
org.springframework: ERROR
org.springframework.data: ERROR
+ org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.dcaegen2.collectors.datafile: ERROR
file: opt/log/application.log
app:
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 51d9cecc..aaa9c1a0 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>1.1.1-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e828776a..46c6e942 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser {
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject
+ .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject message) {
@@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser {
return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+ logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ return Flux.empty();
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+ logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ return Flux.empty();
}
private Optional<FileMetaData> getFileMetaData(JsonObject message) {
@@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser {
private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
- if (fileData.isPresent()) {
- res.add(fileData.get());
- }
+ if (fileData.isPresent()) {
+ res.add(fileData.get());
}
}
return Flux.fromIterable(res);
@@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser {
private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 171dd024..c465fe94 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -59,11 +60,18 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
-
+ //@formatter:off
consumeFromDmaapMessage()
+ .publishOn(Schedulers.parallel())
+ .cache()
.doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+ .flatMap(this::collectFilesFromXnf)
+ .retry(3)
+ .cache()
+ .flatMap(this::publishToDmaapConfiguration)
+ .retry(3)
.subscribe(this::onSuccess, this::onError, this::onComplete);
+ //@formatter:on
}
private void onComplete() {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index a9bc546f..0ae9ece4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -60,7 +60,7 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -108,7 +108,98 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[" + parsedString + "," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
+ throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[{\"event\":{}}," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.location(LOCATION)
@@ -132,7 +223,7 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
@Test
@@ -164,6 +255,27 @@ class DmaapConsumerJsonParserTest {
}
@Test
+ void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
+ // @formatter:off
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
+ }
+
+ @Test
void whenPassingCorrectJsonWithoutLocation_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
@@ -303,7 +415,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+ void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
// @formatter:off
JsonMessage message = new JsonMessage.JsonMessageBuilder()
.eventName(NR_RADIO_ERICSSON_EVENT_NAME)
@@ -320,27 +432,23 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().expectComplete().verify();
}
@Test
- void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
- JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
-
- String incorrectMessageString = message.toString();
- String parsedString = message.getParsed();
+ void whenPassingJsonWithNullJsonElement_noFileData() {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = new JsonParser().parse("{}");
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ .expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -364,11 +472,11 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectNextCount(0).expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -392,6 +500,6 @@ class DmaapConsumerJsonParserTest {
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
}