aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-06-03 08:38:10 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-06-03 08:38:10 +0000
commitf26ab6b4e6ac57759037d2c854b7390c7afb6456 (patch)
treea20c3607310ec7e14d58051b600367c22b06f362
parent4229afc64d82cbd1ea1e43c92fcd6c9bed9e5137 (diff)
Bugfixes, Generalizing Data File Collection to handle any type of file
- When a change ID was not configured, the task counter was not decreased. The result was that the DFC stopped polling, - When the check if a file is a already published fails (most likely to a problem is the DR), the DFC will try to publish it (instead of just ingnoring it). Change-Id: If9f5b962210f809d5d2ae0aa60d3a7f99099c058 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java16
1 files changed, 7 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index bac52659..0f220fde 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -117,10 +117,10 @@ public class ScheduledTasks {
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
- .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
.filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
.doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
@@ -210,21 +210,19 @@ public class ScheduledTasks {
}
private boolean shouldBePublished(FileDataWithContext fileData) {
- boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
try {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ return result;
} catch (DatafileTaskException e) {
logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ return true; // Publish it then
}
+ } else {
+ return false;
}
- if (!result) {
- currentNumberOfTasks.decrementAndGet();
- }
-
- return result;
}
private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
@@ -269,7 +267,7 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
"Consuming new file ready messages, current number of tasks: {}, published files: {}, "
- + "number of subscriptions: {}",
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();