diff options
2 files changed, 40 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 3e4481dd..ae8c6215 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -33,19 +34,20 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int NUMBER_OF_WORKER_THREADS = 200; private static final int MAX_TASKS_FOR_POLLING = 50; private static final long DATA_ROUTER_MAX_RETRIES = 5; private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); @@ -56,8 +58,9 @@ public class ScheduledTasks { private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); - PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); + PublishedFileCache publishedFilesCache = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. @@ -74,13 +77,27 @@ public class ScheduledTasks { */ public void executeDatafileMainTask() { try { + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info( + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size()); + return; + } + + currentNumberOfSubscriptions.incrementAndGet(); Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // .subscribe(model -> onSuccess(model, context), // - thr -> onError(thr, context), // - () -> onComplete(context)); + throwable -> { + onError(throwable, context); + currentNumberOfSubscriptions.decrementAndGet(); + }, // + () -> { + onComplete(context); + currentNumberOfSubscriptions.decrementAndGet(); + }); } catch (Exception e) { logger.error("Unexpected exception: ", e); } @@ -88,13 +105,13 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel .runOn(scheduler) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> fetchFile(fileData, contextMap)) // - .flatMap(model -> publishToDataRouter(model, contextMap)) // + .filter(fileData -> shouldBePublished(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) // + .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); @@ -104,7 +121,7 @@ public class ScheduledTasks { * called in regular intervals to remove out-dated cached information. */ public void purgeCachedInformation(Instant now) { - alreadyPublishedFiles.purge(now); + publishedFilesCache.purge(now); } protected PublishedChecker createPublishedChecker() { @@ -149,9 +166,13 @@ public class ScheduledTasks { private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { boolean result = false; Path localFilePath = fileData.getLocalFilePath(); - if (alreadyPublishedFiles.put(localFilePath) == null) { + if (publishedFilesCache.put(localFilePath) == null) { result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); } + if (!result) { + currentNumberOfTasks.decrementAndGet(); + } + return result; } @@ -167,7 +188,7 @@ public class ScheduledTasks { Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFilePath, contextMap); - alreadyPublishedFiles.remove(localFilePath); + publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -187,7 +208,7 @@ public class ScheduledTasks { logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); - alreadyPublishedFiles.remove(internalFileName); + publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -196,11 +217,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { - logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { - logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); - return Flux.empty(); - } + logger.info( + "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 24bb759f..e07ed02d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -293,8 +293,10 @@ public class ScheduledTasksTest { verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); + verifyNoMoreInteractions(dataRouterMock); } } |