diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java | 46 |
1 files changed, 31 insertions, 15 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 3e444af0..cb93df1e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -20,24 +20,24 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; - import reactor.core.publisher.Mono; /** + * Collects a file from a PNF. + * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class FileCollector { @@ -45,22 +45,37 @@ public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); private final AppConfig datafileAppConfig; + /** + * Constructor. + * + * @param datafileAppConfig application configuration + */ public FileCollector(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; } - public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, + /** + * Collects a file from the PNF and stores it in the local file system. + * + * @param fileData data about the file to collect. + * @param numRetries the number of retries if the publishing fails + * @param firstBackoff the time to delay the first retry + * @param contextMap context for logging. + * + * @return the data needed to publish the file. + */ + public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { MDC.setContextMap(contextMap); - logger.trace("Entering execute with {}", fileData); + logger.trace("Entering collectFile with {}", fileData); return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + .retryBackoff(numRetries, firstBackoff); } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("starting to collectFile {}", fileData.name()); @@ -71,9 +86,10 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getConsumerDmaapModel(fileData, localFile)); + return Mono.just(getFilePublishInformation(fileData, localFile)); } catch (Exception throwable) { - logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + throwable.toString()); return Mono.error(throwable); } } @@ -89,10 +105,10 @@ public class FileCollector { } } - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); - return ImmutableConsumerDmaapModel.builder() // + return ImmutableFilePublishInformation.builder() // .productName(metaData.productName()) // .vendorName(metaData.vendorName()) // .lastEpochMicrosec(metaData.lastEpochMicrosec()) // @@ -109,12 +125,12 @@ public class FileCollector { } protected SftpClient createSftpClient(FileData fileData) { - return new SftpClient(fileData.fileServerData()); + return new SftpClient(fileData.fileServerData()); } protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), - Paths.get(config.trustedCA()), config.trustedCAPassword()); + Paths.get(config.trustedCa()), config.trustedCaPassword()); } } |