diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 8b496ba2..037f495f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -23,8 +23,8 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; @@ -39,8 +39,8 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the - * message router, fetch new files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new + * files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -79,14 +79,14 @@ public class ScheduledTasks { applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // .subscribe(model -> onSuccess(model, context), // - thr -> onError(thr, context), // - () -> onComplete(context)); + thr -> onError(thr, context), // + () -> onComplete(context)); } catch (Exception e) { logger.error("Unexpected exception: ", e); } } - Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { + Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // @@ -115,8 +115,8 @@ public class ScheduledTasks { return currentNumberOfTasks.get(); } - protected DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); + protected DMaaPMessageConsumer createConsumerTask() { + return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { @@ -132,7 +132,7 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.info("Datafile file published {}", model.getInternalLocation()); } @@ -146,19 +146,19 @@ public class ScheduledTasks { boolean result = false; Path localFilePath = fileData.getLocalFilePath(); if (alreadyPublishedFiles.put(localFilePath) == null) { - result = !createPublishedChecker().execute(fileData.name(), contextMap); + result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); } return result; } - private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); return createFileCollector() - .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); @@ -168,15 +168,17 @@ public class ScheduledTasks { return Mono.empty(); } - private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { + private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model, + Map<String, String> contextMap) { MDC.setContextMap(contextMap); return createDataRouterPublisher() - .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { + private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model, + Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); @@ -198,7 +200,7 @@ public class ScheduledTasks { Map<String, String> contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // - .execute() // + .getMessageRouterResponse() // .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); } @@ -215,8 +217,7 @@ public class ScheduledTasks { try { Files.delete(localFile); } catch (Exception e) { - logger.trace("Could not delete file: {}", localFile); + logger.trace("Could not delete file: {}", localFile, e); } } - } |