diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 300ca601..bc73ddb2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.Counters; @@ -35,12 +36,12 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; - /** * This implements the main flow of the data file collector. Fetch file ready events from the * message router, fetch new files from the PNF publish these in the data router. @@ -84,10 +85,10 @@ public class ScheduledTasks { try { if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) { logger.info( - "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, " - + "published files: {}, number of queued VES events: {}", - getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), - threadPoolQueueSize.get()); + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, " + + "published files: {}, number of queued VES events: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), + threadPoolQueueSize.get()); return; } if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { @@ -99,15 +100,15 @@ public class ScheduledTasks { Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); createMainTask(context) // - .subscribe(ScheduledTasks::onSuccess, // - throwable -> { - onError(throwable, context); - currentNumberOfSubscriptions.decrementAndGet(); - }, // - () -> { - onComplete(context); - currentNumberOfSubscriptions.decrementAndGet(); - }); + .subscribe(ScheduledTasks::onSuccess, // + throwable -> { + onError(throwable, context); + currentNumberOfSubscriptions.decrementAndGet(); + }, // + () -> { + onComplete(context); + currentNumberOfSubscriptions.decrementAndGet(); + }); } catch (Exception e) { logger.error("Unexpected exception: {}", e.toString(), e); } @@ -115,21 +116,21 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // - .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // - .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread - .runOn(scheduler) // - .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // - .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .flatMap(fileData -> createMdcContext(fileData, context)) // - .filter(this::isFeedConfigured) // - .filter(this::shouldBePublished) // - .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(this::fetchFile, false, 1, 1) // - .flatMap(this::publishToDataRouter, false, 1, 1) // - .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // - .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // - .sequential(); + .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .runOn(scheduler) // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // + .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // + .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // + .flatMap(this::fetchFile, false, 1, 1) // + .flatMap(this::publishToDataRouter, false, 1, 1) // + .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // + .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // + .sequential(); } private class FileDataWithContext { @@ -157,7 +158,6 @@ public class ScheduledTasks { return this.counters; } - protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -213,7 +213,7 @@ public class ScheduledTasks { return true; } else { logger.info("No feed is configured for: {}, file ignored: {}", - fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); return false; } } @@ -223,7 +223,7 @@ public class ScheduledTasks { if (publishedFilesCache.put(localFilePath) == null) { try { boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), - fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); return result; } catch (DatafileTaskException e) { logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); @@ -237,9 +237,9 @@ public class ScheduledTasks { private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { MDC.setContextMap(fileData.context); return createFileCollector() // - .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, - fileData.context) // - .onErrorResume(exception -> handleFetchFileFailure(fileData)); + .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, + fileData.context) // + .onErrorResume(exception -> handleFetchFileFailure(fileData)); } private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) { @@ -257,8 +257,8 @@ public class ScheduledTasks { MDC.setContextMap(publishInfo.getContext()); return createDataRouterPublisher() - .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) - .onErrorResume(exception -> handlePublishFailure(publishInfo)); + .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) + .onErrorResume(exception -> handlePublishFailure(publishInfo)); } private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) { @@ -277,15 +277,15 @@ public class ScheduledTasks { */ Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, " - + "number of subscriptions: {}", - getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", + getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); try { return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); } catch (Exception e) { logger.error("Could not create message consumer task", e); return Flux.empty(); @@ -295,7 +295,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { MDC.setContextMap(context); logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), - this.applicationConfiguration.getDmaapConsumerConfiguration()); + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } |