aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java49
1 files changed, 25 insertions, 24 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 311f752d..20bf599b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -21,6 +21,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -70,15 +71,15 @@ public class FileCollector {
* @return the data needed to publish the file.
*/
public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff,
- Map<String, String> contextMap) {
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
- .cache() //
- .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
- .retryBackoff(numRetries, firstBackoff) //
- .flatMap(FileCollector::checkCollectedFile);
+ .cache() //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
+ .retryBackoff(numRetries, firstBackoff) //
+ .flatMap(FileCollector::checkCollectedFile);
}
private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
@@ -105,7 +106,7 @@ public class FileCollector {
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
} catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- e.toString());
+ e.toString());
counters.incNoOfFailedFtpAttempts();
if (e instanceof NonRetryableDatafileTaskException) {
return Mono.just(Optional.empty()); // Give up
@@ -114,7 +115,7 @@ public class FileCollector {
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString(), throwable);
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -131,25 +132,25 @@ public class FileCollector {
}
private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
- Map<String, String> context) {
+ Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
return ImmutableFilePublishInformation.builder() //
- .productName(metaData.productName()) //
- .vendorName(metaData.vendorName()) //
- .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
- .sourceName(metaData.sourceName()) //
- .startEpochMicrosec(metaData.startEpochMicrosec()) //
- .timeZoneOffset(metaData.timeZoneOffset()) //
- .name(fileData.name()) //
- .location(location) //
- .internalLocation(localFile) //
- .compression(fileData.compression()) //
- .fileFormatType(fileData.fileFormatType()) //
- .fileFormatVersion(fileData.fileFormatVersion()) //
- .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
- .context(context) //
- .build();
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
+ .context(context) //
+ .build();
}
protected SftpClient createSftpClient(FileData fileData) {
@@ -159,6 +160,6 @@ public class FileCollector {
protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
- Paths.get(config.trustedCa()), config.trustedCaPassword());
+ Paths.get(config.trustedCa()), config.trustedCaPassword());
}
}