From 511eaecfebcc89346756fe43ef43e6c106602cf6 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 17 May 2019 06:38:19 +0000 Subject: Fix, skip FTP retry in certain cases In certain conditions there is no reason to retry fetching files. For instance when the file is removed in the PNF or when the password/certificate is wrong. When the DFC is started there are always queued VES events that referes to removed files which in turn results in that the DFC will retry fetching these files in vain. The DFC house keeps its number of concurrents tasks to not exeed quotas for memory,open file descriptors etc. As more threads are occupied with retrying, the fewer threads can do their intended work, which decreases the throughput. Testing has showed that already when the number of PNFs are 10, the throughput is radically decreased (and the problem is then escalating). Change-Id: I4833724a3ef3509025f4a0a438c6c8025932b0f4 Issue-ID: DCAEGEN2-1508 Signed-off-by: PatrikBuhr --- .../datafile/exceptions/DatafileTaskException.java | 17 ++++++++++++ .../collectors/datafile/ftp/FtpsClient.java | 7 +++-- .../collectors/datafile/ftp/SftpClient.java | 11 ++++++-- .../collectors/datafile/model/FileData.java | 1 + .../collectors/datafile/tasks/FileCollector.java | 32 ++++++++++++++++++---- .../collectors/datafile/tasks/ScheduledTasks.java | 5 ++-- .../datafile/tasks/FileCollectorTest.java | 17 ++++++++++++ 7 files changed, 77 insertions(+), 13 deletions(-) (limited to 'datafile-app-server') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 42308000..38f74ed1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,12 +21,29 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; + private final boolean isRetryable; public DatafileTaskException(String message) { super(message); + isRetryable = true; + } + + public DatafileTaskException(String message, boolean retry) { + super(message); + isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); + isRetryable = true; + } + + public DatafileTaskException(String message, Exception originalException, boolean retry) { + super(message, originalException); + isRetryable = retry; + } + + public boolean isRetryable() { + return this.isRetryable; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index b8488f34..c78ae3a3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -28,9 +28,11 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.Optional; + import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; + import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; @@ -117,7 +119,8 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - throw new DatafileTaskException("Could not retrieve file " + remoteFileName); + final boolean retry = false; // Skip retrying for all problems except IOException + throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); @@ -173,7 +176,7 @@ public class FtpsClient implements FileCollectClient { protected OutputStream createOutputStream(Path localFileName) throws IOException { File localFile = localFileName.toFile(); - if (localFile.createNewFile()) { + if (!localFile.createNewFile()) { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 40068598..333be92a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -21,8 +21,11 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +56,9 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); - } catch (Exception e) { - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } catch (SftpException e) { + boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); } logger.trace("collectFile OK"); @@ -81,7 +85,8 @@ public class SftpClient implements FileCollectClient { sftpChannel = getChannel(session); } } catch (JSchException e) { - throw new DatafileTaskException("Could not open Sftp client" + e, e); + boolean retry = !e.getMessage().contains("Auth fail"); + throw new DatafileTaskException("Could not open Sftp client" + e, e, retry); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 96237e41..0a6b669c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -24,6 +24,7 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 6f3f6b72..0c62795e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; @@ -74,10 +75,20 @@ public class FileCollector { return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(numRetries, firstBackoff); + .retryBackoff(numRetries, firstBackoff) // + .flatMap(this::checkCollectedFile); } - private Mono collectFile(FileData fileData, Map context) { + private Mono checkCollectedFile(Optional info) { + if (info.isPresent()) { + return Mono.just(info.get()); + } else { + // If there is no info, the file is not retrievable + return Mono.error(new DatafileTaskException("Non retryable file transfer failure")); + } + } + + private Mono> collectFile(FileData fileData, Map context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -88,11 +99,19 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile, context)); - } catch (Exception throwable) { + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); + } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + e.toString()); + if (e.isRetryable()) { + return Mono.error(e); + } else { + return Mono.just(Optional.empty()); // Give up + } + } catch (Exception throwable) { + logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); - return Mono.error(throwable); + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -107,7 +126,8 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map context) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + Map context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 5cc894c3..26d47b51 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -202,8 +202,9 @@ public class ScheduledTasks { private Mono fetchFile(FileDataWithContext fileData) { MDC.setContextMap(fileData.context); - return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, - FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context) + return createFileCollector() // + .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, + fileData.context) // .onErrorResume(exception -> handleFetchFileFailure(fileData)); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 1a9d6699..cad3486d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -213,6 +213,23 @@ public class FileCollectorTest { verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } + @Test + public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); + + final boolean retry = false; + FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); + doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + + StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) + .expectErrorMessage("Non retryable file transfer failure") // + .verify(); + + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + } + @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); -- cgit 1.2.3-korg