diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-05-17 06:38:19 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-05-17 06:38:19 +0000 |
commit | 511eaecfebcc89346756fe43ef43e6c106602cf6 (patch) | |
tree | af377c4d9cf9db9ad4fb21d9eecaae837e21c6cf /datafile-app-server | |
parent | 75d51a299e7d36cb988ef074fce00eb4b29a3394 (diff) |
Fix, skip FTP retry in certain cases
In certain conditions there is no reason to retry fetching files.
For instance when the file is removed in the PNF or when
the password/certificate is wrong.
When the DFC is started there are always
queued VES events that referes to removed files which in turn results
in that the DFC will retry fetching these files in vain.
The DFC house keeps its number of concurrents tasks to not exeed quotas
for memory,open file descriptors etc.
As more threads are occupied with retrying, the fewer threads can do
their intended work, which decreases the throughput.
Testing has showed that already when the number of PNFs are 10, the throughput
is radically decreased (and the problem is then escalating).
Change-Id: I4833724a3ef3509025f4a0a438c6c8025932b0f4
Issue-ID: DCAEGEN2-1508
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
7 files changed, 77 insertions, 13 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 42308000..38f74ed1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,12 +21,29 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; + private final boolean isRetryable; public DatafileTaskException(String message) { super(message); + isRetryable = true; + } + + public DatafileTaskException(String message, boolean retry) { + super(message); + isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); + isRetryable = true; + } + + public DatafileTaskException(String message, Exception originalException, boolean retry) { + super(message, originalException); + isRetryable = retry; + } + + public boolean isRetryable() { + return this.isRetryable; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index b8488f34..c78ae3a3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -28,9 +28,11 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.Optional; + import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; + import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; @@ -117,7 +119,8 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - throw new DatafileTaskException("Could not retrieve file " + remoteFileName); + final boolean retry = false; // Skip retrying for all problems except IOException + throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); @@ -173,7 +176,7 @@ public class FtpsClient implements FileCollectClient { protected OutputStream createOutputStream(Path localFileName) throws IOException { File localFile = localFileName.toFile(); - if (localFile.createNewFile()) { + if (!localFile.createNewFile()) { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 40068598..333be92a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -21,8 +21,11 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +56,9 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); - } catch (Exception e) { - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } catch (SftpException e) { + boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); } logger.trace("collectFile OK"); @@ -81,7 +85,8 @@ public class SftpClient implements FileCollectClient { sftpChannel = getChannel(session); } } catch (JSchException e) { - throw new DatafileTaskException("Could not open Sftp client" + e, e); + boolean retry = !e.getMessage().contains("Auth fail"); + throw new DatafileTaskException("Could not open Sftp client" + e, e, retry); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 96237e41..0a6b669c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -24,6 +24,7 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 6f3f6b72..0c62795e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; @@ -74,10 +75,20 @@ public class FileCollector { return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(numRetries, firstBackoff); + .retryBackoff(numRetries, firstBackoff) // + .flatMap(this::checkCollectedFile); } - private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) { + private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { + if (info.isPresent()) { + return Mono.just(info.get()); + } else { + // If there is no info, the file is not retrievable + return Mono.error(new DatafileTaskException("Non retryable file transfer failure")); + } + } + + private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -88,11 +99,19 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile, context)); - } catch (Exception throwable) { + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); + } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + e.toString()); + if (e.isRetryable()) { + return Mono.error(e); + } else { + return Mono.just(Optional.empty()); // Give up + } + } catch (Exception throwable) { + logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); - return Mono.error(throwable); + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -107,7 +126,8 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 5cc894c3..26d47b51 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -202,8 +202,9 @@ public class ScheduledTasks { private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { MDC.setContextMap(fileData.context); - return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, - FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context) + return createFileCollector() // + .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, + fileData.context) // .onErrorResume(exception -> handleFetchFileFailure(fileData)); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 1a9d6699..cad3486d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -214,6 +214,23 @@ public class FileCollectorTest { } @Test + public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); + + final boolean retry = false; + FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); + doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + + StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) + .expectErrorMessage("Non retryable file transfer failure") // + .verify(); + + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + } + + @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); |