diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index b5fa0c24..bac52659 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -86,13 +87,16 @@ public class ScheduledTasks { threadPoolQueueSize.get()); return; } + if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { + logger.warn("No configuration loaded, skipping polling for messages"); + return; + } currentNumberOfSubscriptions.incrementAndGet(); Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(this::onSuccess, // + .subscribe(ScheduledTasks::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -115,6 +119,7 @@ public class ScheduledTasks { .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // .flatMap(this::publishToDataRouter, false, 1, 1) // @@ -124,13 +129,13 @@ public class ScheduledTasks { } private class FileDataWithContext { - FileDataWithContext(FileData fileData, Map<String, String> context) { + public final FileData fileData; + public final Map<String, String> context; + + public FileDataWithContext(FileData fileData, Map<String, String> context) { this.fileData = fileData; this.context = context; } - - final FileData fileData; - final Map<String, String> context; } /** @@ -160,7 +165,7 @@ public class ScheduledTasks { return this.threadPoolQueueSize.get(); } - protected DMaaPMessageConsumer createConsumerTask() { + protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -172,17 +177,17 @@ public class ScheduledTasks { return new DataRouterPublisher(applicationConfiguration); } - private void onComplete(Map<String, String> contextMap) { + private static void onComplete(Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation publishInfo) { + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map<String, String> context) { + private static void onError(Throwable throwable, Map<String, String> context) { MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } @@ -194,11 +199,26 @@ public class ScheduledTasks { return Mono.just(pair); } + private boolean isFeedConfigured(FileDataWithContext fileData) { + if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) { + return true; + } else { + logger.info("No feed is configured for: {}, file ignored: {}", + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + return false; + } + } + private boolean shouldBePublished(FileDataWithContext fileData) { boolean result = false; Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); + try { + result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + } catch (DatafileTaskException e) { + logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); + } } if (!result) { currentNumberOfTasks.decrementAndGet(); @@ -248,13 +268,19 @@ public class ScheduledTasks { */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); - return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + try { + return createConsumerTask() // + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + } catch (Exception e) { + logger.error("Could not create message consumer task", e); + return Flux.empty(); + } } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { @@ -264,7 +290,7 @@ public class ScheduledTasks { return Flux.empty(); } - private void deleteFile(Path localFile, Map<String, String> context) { + private static void deleteFile(Path localFile, Map<String, String> context) { MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { |