diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-05-17 06:19:25 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-05-17 06:19:25 +0000 |
commit | 7b84c15301abbca5ae586a6cbfcf482570a8c35f (patch) | |
tree | 9dafb73194853ae2dde660c532c2fe2e63dea8f0 | |
parent | 75d51a299e7d36cb988ef074fce00eb4b29a3394 (diff) |
Fix for robustness
In the long time stability test it has showed that when there are an exessive amount of FileReady events, the DFC will fetch these from the MR and build up an internal queue.
This has the effect that the DFC will consume too much memory. Itmight also affect load balancing (in case of several DFC instances) so that one might consume all events and the others get nothing.
The DFC should not fetch new FileReady events when it fully loaded with work.
Change-Id: I58665edd678d2f1c8a32d0e56455228b522aab40
Issue-ID: DCAEGEN2-1509
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
3 files changed, 16 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 5bcf18c6..3a3eb3aa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -107,10 +107,9 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { - return createMessages( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new)))); } /** diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 5cc894c3..36120fae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -59,6 +59,7 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); @@ -78,10 +79,11 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) { logger.info( - "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", - getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), + threadPoolQueueSize.get()); return; } @@ -106,18 +108,19 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // - .flatMap(this::publishToDataRouter,false, 1, 1) // + .flatMap(this::publishToDataRouter, false, 1, 1) // .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // .sequential(); - } private class FileDataWithContext { @@ -153,6 +156,10 @@ public class ScheduledTasks { return currentNumberOfSubscriptions.get(); } + public int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 1a7db424..0d5a4231 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -202,6 +202,7 @@ public class ScheduledTasksTest { .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); + assertEquals(0, testedObject.getThreadPoolQueueSize()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull()); |