diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-03 08:38:10 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-03 08:38:10 +0000 |
commit | f26ab6b4e6ac57759037d2c854b7390c7afb6456 (patch) | |
tree | a20c3607310ec7e14d58051b600367c22b06f362 /datafile-app-server | |
parent | 4229afc64d82cbd1ea1e43c92fcd6c9bed9e5137 (diff) |
Bugfixes, Generalizing Data File Collection to handle any type of file
- When a change ID was not configured, the task counter was not decreased.
The result was that the DFC stopped polling,
- When the check if a file is a already published fails
(most likely to a problem is the DR), the DFC will try
to publish it (instead of just ingnoring it).
Change-Id: If9f5b962210f809d5d2ae0aa60d3a7f99099c058
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index bac52659..0f220fde 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -117,10 +117,10 @@ public class ScheduledTasks { .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // .filter(this::isFeedConfigured) // .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(this::fetchFile, false, 1, 1) // .flatMap(this::publishToDataRouter, false, 1, 1) // .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // @@ -210,21 +210,19 @@ public class ScheduledTasks { } private boolean shouldBePublished(FileDataWithContext fileData) { - boolean result = false; Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { try { - result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + return result; } catch (DatafileTaskException e) { logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); + return true; // Publish it then } + } else { + return false; } - if (!result) { - currentNumberOfTasks.decrementAndGet(); - } - - return result; } private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { @@ -269,7 +267,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( "Consuming new file ready messages, current number of tasks: {}, published files: {}, " - + "number of subscriptions: {}", + + "number of subscriptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); |