summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-05-17 06:19:25 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-05-17 06:19:25 +0000
commit7b84c15301abbca5ae586a6cbfcf482570a8c35f (patch)
tree9dafb73194853ae2dde660c532c2fe2e63dea8f0 /datafile-app-server
parent75d51a299e7d36cb988ef074fce00eb4b29a3394 (diff)
Fix for robustness
In the long time stability test it has showed that when there are an exessive amount of FileReady events, the DFC will fetch these from the MR and build up an internal queue. This has the effect that the DFC will consume too much memory. Itmight also affect load balancing (in case of several DFC instances) so that one might consume all events and the others get nothing. The DFC should not fetch new FileReady events when it fully loaded with work. Change-Id: I58665edd678d2f1c8a32d0e56455228b522aab40 Issue-ID: DCAEGEN2-1509 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java17
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java1
3 files changed, 16 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 5bcf18c6..3a3eb3aa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -107,10 +107,9 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
- return createMessages(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
- .orElseGet(JsonObject::new)))));
+ return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new))));
}
/**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 5cc894c3..36120fae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -59,6 +59,7 @@ public class ScheduledTasks {
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
@@ -78,10 +79,11 @@ public class ScheduledTasks {
*/
public void executeDatafileMainTask() {
try {
- if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) {
logger.info(
- "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}",
- getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size());
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(),
+ threadPoolQueueSize.get());
return;
}
@@ -106,18 +108,19 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(this::publishToDataRouter,false, 1, 1) //
+ .flatMap(this::publishToDataRouter, false, 1, 1) //
.doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
.doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
-
}
private class FileDataWithContext {
@@ -153,6 +156,10 @@ public class ScheduledTasks {
return currentNumberOfSubscriptions.get();
}
+ public int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 1a7db424..0d5a4231 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -202,6 +202,7 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ assertEquals(0, testedObject.getThreadPoolQueueSize());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());