aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java46
1 files changed, 31 insertions, 15 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 3e444af0..cb93df1e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -20,24 +20,24 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Collects a file from a PNF.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class FileCollector {
@@ -45,22 +45,37 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ /**
+ * Constructor.
+ *
+ * @param datafileAppConfig application configuration
+ */
public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ /**
+ * Collects a file from the PNF and stores it in the local file system.
+ *
+ * @param fileData data about the file to collect.
+ * @param numRetries the number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap context for logging.
+ *
+ * @return the data needed to publish the file.
+ */
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Entering execute with {}", fileData);
+ logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ .retryBackoff(numRetries, firstBackoff);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
@@ -71,9 +86,10 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,10 +105,10 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
- return ImmutableConsumerDmaapModel.builder() //
+ return ImmutableFilePublishInformation.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
.lastEpochMicrosec(metaData.lastEpochMicrosec()) //
@@ -109,12 +125,12 @@ public class FileCollector {
}
protected SftpClient createSftpClient(FileData fileData) {
- return new SftpClient(fileData.fileServerData());
+ return new SftpClient(fileData.fileServerData());
}
protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
- Paths.get(config.trustedCA()), config.trustedCAPassword());
+ Paths.get(config.trustedCa()), config.trustedCaPassword());
}
}