diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-04-11 15:36:48 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-04-11 15:36:48 +0000 |
commit | 77793c7a822e8bb7771bfb0cf9db2ce4dce865d4 (patch) | |
tree | 4d85053c31d5e1562bd22b40c05d26d6420fb713 /datafile-app-server | |
parent | 314a1e4310545e5b70ff64e328f3e7eae281c5b4 (diff) |
Robustness issue
Limiting the number of paralell threads when
retying of ftp or publish.
Previously there was no upper limit on how many paralel
threads that could be started by retry attempts.
For instance, a worker thread with 100 new files
would start 100 new threads when the ftt server
was unavailable.
Change-Id: Ia8792f03ea55c0c467ef248ddc9d59187c06a946
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
3 files changed, 41 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 257c356c..2b3a0ef6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -63,7 +63,7 @@ public class PublishedFileCache { } } - int size() { + public int size() { return publishedFiles.size(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 037f495f..c047f15e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -33,19 +34,20 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int NUMBER_OF_WORKER_THREADS = 200; private static final int MAX_TASKS_FOR_POLLING = 50; private static final long DATA_ROUTER_MAX_RETRIES = 5; private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); @@ -56,8 +58,9 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); - PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); + PublishedFileCache publishedFilesCache = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. @@ -74,13 +77,27 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info( + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + return; + } + + currentNumberOfSubscriptions.incrementAndGet(); Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // .subscribe(model -> onSuccess(model, context), // - thr -> onError(thr, context), // - () -> onComplete(context)); + throwable -> { + onError(throwable, context); + currentNumberOfSubscriptions.decrementAndGet(); + }, // + () -> { + onComplete(context); + currentNumberOfSubscriptions.decrementAndGet(); + }); } catch (Exception e) { logger.error("Unexpected exception: ", e); } @@ -88,13 +105,13 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel .runOn(scheduler) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> fetchFile(fileData, contextMap)) // - .flatMap(model -> publishToDataRouter(model, contextMap)) // + .filter(fileData -> shouldBePublished(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) // + .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); @@ -104,7 +121,7 @@ public class ScheduledTasks { * called in regular intervals to remove out-dated cached information. */ public void purgeCachedInformation(Instant now) { - alreadyPublishedFiles.purge(now); + publishedFilesCache.purge(now); } protected PublishedChecker createPublishedChecker() { @@ -145,9 +162,13 @@ public class ScheduledTasks { private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { boolean result = false; Path localFilePath = fileData.getLocalFilePath(); - if (alreadyPublishedFiles.put(localFilePath) == null) { + if (publishedFilesCache.put(localFilePath) == null) { result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); } + if (!result) { + currentNumberOfTasks.decrementAndGet(); + } + return result; } @@ -163,7 +184,7 @@ public class ScheduledTasks { Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFilePath, contextMap); - alreadyPublishedFiles.remove(localFilePath); + publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -183,7 +204,7 @@ public class ScheduledTasks { logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); - alreadyPublishedFiles.remove(internalFileName); + publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -192,11 +213,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { - logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { - logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); - return Flux.empty(); - } + logger.info( + "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 24bb759f..e07ed02d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -293,8 +293,10 @@ public class ScheduledTasksTest { verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); + verifyNoMoreInteractions(dataRouterMock); } } |