diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 156 |
1 files changed, 63 insertions, 93 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index d41e5c25..b4096c73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -20,8 +20,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; @@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - - /** Data needed for fetching of one file. */ - private class FileCollectionData { - final FileData fileData; - final MessageMetaData metaData; - - FileCollectionData(FileData fd, MessageMetaData metaData) { - this.fileData = fd; - this.metaData = metaData; - } - } + private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int MAX_TASKS_FOR_POLLING = 50; + private static final long DATA_ROUTER_MAX_RETRIES = 5; + private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); + private static final long FILE_TRANSFER_MAX_RETRIES = 3; + private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5); private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); - private final Scheduler scheduler = - Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); + private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** @@ -86,21 +75,25 @@ public class ScheduledTasks { * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); - createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), - () -> onComplete(contextMap)); + try { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Execution of tasks was registered"); + applicationConfiguration.loadConfigurationFromFile(); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); + } catch (Exception e) { + logger.error("Unexpected exception: ", e); + } } Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // - .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // - .flatMap(this::createFileCollectionTask) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap)) // .flatMap(model -> publishToDataRouter(model, contextMap)) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // @@ -114,63 +107,61 @@ public class ScheduledTasks { alreadyPublishedFiles.purge(now); } - private void onComplete(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile tasks have been completed"); + protected PublishedChecker createPublishedChecker() { + return new PublishedChecker(applicationConfiguration); } - private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile consumed tasks {}", model.getInternalLocation()); + protected int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); } - private void onError(Throwable throwable, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + protected DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - private int getParallelism() { - if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { - return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); - } else { - return 1; // We need at least one rail/thread - } + protected FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); + } + + protected DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); } - private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { - List<FileCollectionData> fileCollects = new ArrayList<>(); + private void onComplete(Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Datafile tasks have been completed"); + } - for (FileData fileData : availableFiles.files()) { - fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); - } - return Flux.fromIterable(fileCollects); + private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.info("Datafile file published {}", model.getInternalLocation()); } - private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) { + private void onError(Throwable throwable, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); + } + + private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { boolean result = false; - Path localFileName = task.fileData.getLocalFileName(); + Path localFileName = fileData.getLocalFileName(); if (alreadyPublishedFiles.put(localFileName) == null) { result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap); } return result; } - private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect, - Map<String, String> contextMap) { - final long maxNUmberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - + private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); return createFileCollector() - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, - contextMap) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); + .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); - logger.error("File fetching failed: {}", localFileName); + logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); @@ -178,19 +169,16 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { - final long maxNumberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - - MdcVariables.setMdcContextMap(contextMap); - return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); + + return createDataRouterPublisher() + .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception, - Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); @@ -202,8 +190,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { - logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { + logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); return Flux.empty(); } @@ -215,7 +204,8 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("Polling for file ready message failed, exception: {}", exception); + logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } @@ -229,24 +219,4 @@ public class ScheduledTasks { } } - PublishedChecker createPublishedChecker() { - return new PublishedChecker(applicationConfiguration); - } - - int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); - } - - DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); - } - - FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); - } - - DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); - } - } |