From 7b84c15301abbca5ae586a6cbfcf482570a8c35f Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 17 May 2019 06:19:25 +0000 Subject: Fix for robustness In the long time stability test it has showed that when there are an exessive amount of FileReady events, the DFC will fetch these from the MR and build up an internal queue. This has the effect that the DFC will consume too much memory. Itmight also affect load balancing (in case of several DFC instances) so that one might consume all events and the others get nothing. The DFC should not fetch new FileReady events when it fully loaded with work. Change-Id: I58665edd678d2f1c8a32d0e56455228b522aab40 Issue-ID: DCAEGEN2-1509 Signed-off-by: PatrikBuhr --- .../collectors/datafile/service/JsonMessageParser.java | 7 +++---- .../collectors/datafile/tasks/ScheduledTasks.java | 17 ++++++++++++----- .../collectors/datafile/tasks/ScheduledTasksTest.java | 1 + 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 5bcf18c6..3a3eb3aa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -107,10 +107,9 @@ public class JsonMessageParser { } private Flux getMessagesFromJsonArray(JsonElement jsonElement) { - return createMessages( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new)))); } /** diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 5cc894c3..36120fae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -59,6 +59,7 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); @@ -78,10 +79,11 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) { logger.info( - "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", - getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), + threadPoolQueueSize.get()); return; } @@ -106,18 +108,19 @@ public class ScheduledTasks { Flux createMainTask(Map context) { return fetchMoreFileReadyMessages() // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // - .flatMap(this::publishToDataRouter,false, 1, 1) // + .flatMap(this::publishToDataRouter, false, 1, 1) // .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // .sequential(); - } private class FileDataWithContext { @@ -153,6 +156,10 @@ public class ScheduledTasks { return currentNumberOfSubscriptions.get(); } + public int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 1a7db424..0d5a4231 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -202,6 +202,7 @@ public class ScheduledTasksTest { .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); + assertEquals(0, testedObject.getThreadPoolQueueSize()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull()); -- cgit 1.2.3-korg