From f26ab6b4e6ac57759037d2c854b7390c7afb6456 Mon Sep 17 00:00:00 2001
From: PatrikBuhr <patrik.buhr@est.tech>
Date: Mon, 3 Jun 2019 08:38:10 +0000
Subject: Bugfixes, Generalizing Data File Collection to handle any type of
 file

- When a change ID was not configured, the task counter was not decreased.
The result was that the DFC stopped polling,

- When the check if a file is a already published fails
(most likely to a problem is the DR), the DFC will try
to publish it (instead of just ingnoring it).

Change-Id: If9f5b962210f809d5d2ae0aa60d3a7f99099c058
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
---
 .../collectors/datafile/tasks/ScheduledTasks.java        | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)

(limited to 'datafile-app-server')

diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index bac52659..0f220fde 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -117,10 +117,10 @@ public class ScheduledTasks {
                 .runOn(scheduler) //
                 .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
                 .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
-                .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
                 .flatMap(fileData -> createMdcContext(fileData, context)) //
                 .filter(this::isFeedConfigured) //
                 .filter(this::shouldBePublished) //
+                .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
                 .flatMap(this::fetchFile, false, 1, 1) //
                 .flatMap(this::publishToDataRouter, false, 1, 1) //
                 .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
@@ -210,21 +210,19 @@ public class ScheduledTasks {
     }
 
     private boolean shouldBePublished(FileDataWithContext fileData) {
-        boolean result = false;
         Path localFilePath = fileData.fileData.getLocalFilePath();
         if (publishedFilesCache.put(localFilePath) == null) {
             try {
-                result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+                boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
                         fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+                return result;
             } catch (DatafileTaskException e) {
                 logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+                return true; // Publish it then
             }
+        } else {
+            return false;
         }
-        if (!result) {
-            currentNumberOfTasks.decrementAndGet();
-        }
-
-        return result;
     }
 
     private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
@@ -269,7 +267,7 @@ public class ScheduledTasks {
     private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
         logger.info(
                 "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
-                + "number of subscriptions: {}",
+                        + "number of subscriptions: {}",
                 getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
 
         Map<String, String> context = MDC.getCopyOfContextMap();
-- 
cgit 1.2.3-korg