diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java | 32 |
1 files changed, 26 insertions, 6 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 6f3f6b72..0c62795e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; @@ -74,10 +75,20 @@ public class FileCollector { return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(numRetries, firstBackoff); + .retryBackoff(numRetries, firstBackoff) // + .flatMap(this::checkCollectedFile); } - private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) { + private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { + if (info.isPresent()) { + return Mono.just(info.get()); + } else { + // If there is no info, the file is not retrievable + return Mono.error(new DatafileTaskException("Non retryable file transfer failure")); + } + } + + private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -88,11 +99,19 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile, context)); - } catch (Exception throwable) { + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); + } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + e.toString()); + if (e.isRetryable()) { + return Mono.error(e); + } else { + return Mono.just(Optional.empty()); // Give up + } + } catch (Exception throwable) { + logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); - return Mono.error(throwable); + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -107,7 +126,8 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // |