From 77793c7a822e8bb7771bfb0cf9db2ce4dce865d4 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 11 Apr 2019 15:36:48 +0000 Subject: Robustness issue Limiting the number of paralell threads when retying of ftp or publish. Previously there was no upper limit on how many paralel threads that could be started by retry attempts. For instance, a worker thread with 100 new files would start 100 new threads when the ftt server was unavailable. Change-Id: Ia8792f03ea55c0c467ef248ddc9d59187c06a946 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr --- .../datafile/service/PublishedFileCache.java | 2 +- .../collectors/datafile/tasks/ScheduledTasks.java | 57 ++++++++++++++-------- .../datafile/tasks/ScheduledTasksTest.java | 2 + 3 files changed, 41 insertions(+), 20 deletions(-) diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 257c356c..2b3a0ef6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -63,7 +63,7 @@ public class PublishedFileCache { } } - int size() { + public int size() { return publishedFiles.size(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 037f495f..c047f15e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -33,19 +34,20 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int NUMBER_OF_WORKER_THREADS = 200; private static final int MAX_TASKS_FOR_POLLING = 50; private static final long DATA_ROUTER_MAX_RETRIES = 5; private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); @@ -56,8 +58,9 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); - PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); + PublishedFileCache publishedFilesCache = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. @@ -74,13 +77,27 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info( + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + return; + } + + currentNumberOfSubscriptions.incrementAndGet(); Map context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // .subscribe(model -> onSuccess(model, context), // - thr -> onError(thr, context), // - () -> onComplete(context)); + throwable -> { + onError(throwable, context); + currentNumberOfSubscriptions.decrementAndGet(); + }, // + () -> { + onComplete(context); + currentNumberOfSubscriptions.decrementAndGet(); + }); } catch (Exception e) { logger.error("Unexpected exception: ", e); } @@ -88,13 +105,13 @@ public class ScheduledTasks { Flux createMainTask(Map contextMap) { return fetchMoreFileReadyMessages() // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel .runOn(scheduler) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> fetchFile(fileData, contextMap)) // - .flatMap(model -> publishToDataRouter(model, contextMap)) // + .filter(fileData -> shouldBePublished(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) // + .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); @@ -104,7 +121,7 @@ public class ScheduledTasks { * called in regular intervals to remove out-dated cached information. */ public void purgeCachedInformation(Instant now) { - alreadyPublishedFiles.purge(now); + publishedFilesCache.purge(now); } protected PublishedChecker createPublishedChecker() { @@ -145,9 +162,13 @@ public class ScheduledTasks { private boolean shouldBePublished(FileData fileData, Map contextMap) { boolean result = false; Path localFilePath = fileData.getLocalFilePath(); - if (alreadyPublishedFiles.put(localFilePath) == null) { + if (publishedFilesCache.put(localFilePath) == null) { result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); } + if (!result) { + currentNumberOfTasks.decrementAndGet(); + } + return result; } @@ -163,7 +184,7 @@ public class ScheduledTasks { Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFilePath, contextMap); - alreadyPublishedFiles.remove(localFilePath); + publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -183,7 +204,7 @@ public class ScheduledTasks { logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); - alreadyPublishedFiles.remove(internalFileName); + publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -192,11 +213,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux fetchMoreFileReadyMessages() { - logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { - logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); - return Flux.empty(); - } + logger.info( + "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 24bb759f..e07ed02d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -293,8 +293,10 @@ public class ScheduledTasksTest { verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); + verifyNoMoreInteractions(dataRouterMock); } } -- cgit 1.2.3-korg