diff options
3 files changed, 16 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 5bcf18c6..3a3eb3aa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -107,10 +107,9 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { - return createMessages( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new)))); } /** diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 26d47b51..b5fa0c24 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -59,6 +59,7 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); @@ -78,10 +79,11 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) { logger.info( - "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", - getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), + threadPoolQueueSize.get()); return; } @@ -106,18 +108,19 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // - .flatMap(this::publishToDataRouter,false, 1, 1) // + .flatMap(this::publishToDataRouter, false, 1, 1) // .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // .sequential(); - } private class FileDataWithContext { @@ -153,6 +156,10 @@ public class ScheduledTasks { return currentNumberOfSubscriptions.get(); } + public int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 1a7db424..0d5a4231 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -202,6 +202,7 @@ public class ScheduledTasksTest { .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); + assertEquals(0, testedObject.getThreadPoolQueueSize()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull()); |