diff options
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 102 |
1 files changed, 64 insertions, 38 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 50f5431a..783c699c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -40,23 +40,23 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; + private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - /** Data needed for fetching of files from one PNF */ + /** Data needed for fetching of one file */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; // Same object, ftp session etc. can be used for each - // file in one VES - // event + final FileCollector collectorTask; final MessageMetaData metaData; FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { @@ -68,16 +68,15 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger taskCounter = new AtomicInteger(); - + private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final Scheduler scheduler = + Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. * * @param applicationConfiguration - application configuration - * @param xnfCollectorTask - second task - * @param dmaapPublisherTask - third task */ @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { @@ -90,20 +89,21 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); applicationConfiguration.initFileStreamReader(); - //@formatter:off - consumeMessagesFromDmaap() - .parallel() // Each FileReadyMessage in a separate thread - .runOn(Schedulers.parallel()) - .flatMap(this::createFileCollectionTask) - .filter(this::shouldBePublished) - .doOnNext(fileData -> taskCounter.incrementAndGet()) - .flatMap(this::collectFileFromXnf) - .flatMap(this::publishToDataRouter) - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) - .doOnNext(model -> taskCounter.decrementAndGet()) - .sequential() - .subscribe(this::onSuccess, this::onError, this::onComplete); - //@formatter:on + createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete); + } + + Flux<ConsumerDmaapModel> createMainTask() { + return fetchMoreFileReadyMessages() // + .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .runOn(scheduler) // + .flatMap(this::createFileCollectionTask) // + .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // + .flatMap(this::collectFileFromXnf) // + .flatMap(this::publishToDataRouter) // + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) // + .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // + .sequential(); } /** @@ -125,13 +125,20 @@ public class ScheduledTasks { logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); } + private int getParallelism() { + if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { + return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); + } else { + return 1; // We need at least one rail/thread + } + } + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), - new SftpClient(fileData.fileServerData())); - fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); + fileCollects.add( + new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } @@ -154,7 +161,7 @@ public class ScheduledTasks { logger.error("File fetching failed: {}", localFileName); deleteFile(localFileName); alreadyPublishedFiles.remove(localFileName); - taskCounter.decrementAndGet(); + currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -162,7 +169,7 @@ public class ScheduledTasks { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); - DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + DataRouterPublisher publisherTask = createDataRouterPublisher(); return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) .onErrorResume(exception -> handlePublishFailure(model, exception)); @@ -173,20 +180,21 @@ public class ScheduledTasks { Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); - taskCounter.decrementAndGet(); + currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Flux<FileReadyMessage> consumeMessagesFromDmaap() { - final int currentNumberOfTasks = taskCounter.get(); - logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); - if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + /** + * Fetch more messages from the message router. This is done in a polling/blocking fashion. + */ + private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { + logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { return Flux.empty(); } - final DMaaPMessageConsumerTask messageConsumerTask = - new DMaaPMessageConsumerTask(this.applicationConfiguration); - return messageConsumerTask.execute() // + return createConsumerTask() // + .execute() // .onErrorResume(this::handleConsumeMessageFailure); } @@ -200,7 +208,25 @@ public class ScheduledTasks { try { Files.delete(localFile); } catch (Exception e) { - logger.warn("Could not delete file: {}, {}", localFile, e); + logger.trace("Could not delete file: {}", localFile); } } + + int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); + } + + DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); + } + + FileCollector createFileCollector(FileData fileData) { + return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), + new SftpClient(fileData.fileServerData())); + } + + DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); + } + } |