aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java58
1 files changed, 42 insertions, 16 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index b5fa0c24..bac52659 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -86,13 +87,16 @@ public class ScheduledTasks {
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -115,6 +119,7 @@ public class ScheduledTasks {
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
@@ -124,13 +129,13 @@ public class ScheduledTasks {
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
@@ -160,7 +165,7 @@ public class ScheduledTasks {
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -172,17 +177,17 @@ public class ScheduledTasks {
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -194,11 +199,26 @@ public class ScheduledTasks {
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -248,13 +268,19 @@ public class ScheduledTasks {
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
@@ -264,7 +290,7 @@ public class ScheduledTasks {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {