aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java32
1 files changed, 26 insertions, 6 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 6f3f6b72..0c62795e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
@@ -74,10 +75,20 @@ public class FileCollector {
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(numRetries, firstBackoff);
+ .retryBackoff(numRetries, firstBackoff) //
+ .flatMap(this::checkCollectedFile);
}
- private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ if (info.isPresent()) {
+ return Mono.just(info.get());
+ } else {
+ // If there is no info, the file is not retrievable
+ return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
+ }
+ }
+
+ private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
@@ -88,11 +99,19 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getFilePublishInformation(fileData, localFile, context));
- } catch (Exception throwable) {
+ return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
+ } catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ e.toString());
+ if (e.isRetryable()) {
+ return Mono.error(e);
+ } else {
+ return Mono.just(Optional.empty()); // Give up
+ }
+ } catch (Exception throwable) {
+ logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString());
- return Mono.error(throwable);
+ return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -107,7 +126,8 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
return ImmutableFilePublishInformation.builder() //