diff options
Diffstat (limited to 'datafile-app-server')
9 files changed, 79 insertions, 15 deletions
diff --git a/datafile-app-server/dpo/blueprints/k8s-datafile.yaml b/datafile-app-server/dpo/blueprints/k8s-datafile.yaml index 9ce0559a..534bc1fc 100644 --- a/datafile-app-server/dpo/blueprints/k8s-datafile.yaml +++ b/datafile-app-server/dpo/blueprints/k8s-datafile.yaml @@ -70,7 +70,7 @@ inputs: default: "dradmin" tag_version: type: string - default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.2" + default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.3" replicas: type: integer description: number of instances diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index c0c1da82..4da6d2d6 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors</groupId> <artifactId>datafile</artifactId> - <version>1.1.2-SNAPSHOT</version> + <version>1.1.3-SNAPSHOT</version> </parent> <groupId>org.onap.dcaegen2.collectors.datafile</groupId> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 42308000..38f74ed1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,12 +21,29 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; + private final boolean isRetryable; public DatafileTaskException(String message) { super(message); + isRetryable = true; + } + + public DatafileTaskException(String message, boolean retry) { + super(message); + isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); + isRetryable = true; + } + + public DatafileTaskException(String message, Exception originalException, boolean retry) { + super(message, originalException); + isRetryable = retry; + } + + public boolean isRetryable() { + return this.isRetryable; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index b8488f34..c78ae3a3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -28,9 +28,11 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.Optional; + import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; + import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; @@ -117,7 +119,8 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - throw new DatafileTaskException("Could not retrieve file " + remoteFileName); + final boolean retry = false; // Skip retrying for all problems except IOException + throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); @@ -173,7 +176,7 @@ public class FtpsClient implements FileCollectClient { protected OutputStream createOutputStream(Path localFileName) throws IOException { File localFile = localFileName.toFile(); - if (localFile.createNewFile()) { + if (!localFile.createNewFile()) { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 40068598..333be92a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -21,8 +21,11 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +56,9 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); - } catch (Exception e) { - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } catch (SftpException e) { + boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); } logger.trace("collectFile OK"); @@ -81,7 +85,8 @@ public class SftpClient implements FileCollectClient { sftpChannel = getChannel(session); } } catch (JSchException e) { - throw new DatafileTaskException("Could not open Sftp client" + e, e); + boolean retry = !e.getMessage().contains("Auth fail"); + throw new DatafileTaskException("Could not open Sftp client" + e, e, retry); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 96237e41..0a6b669c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -24,6 +24,7 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 6f3f6b72..0c62795e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; @@ -74,10 +75,20 @@ public class FileCollector { return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(numRetries, firstBackoff); + .retryBackoff(numRetries, firstBackoff) // + .flatMap(this::checkCollectedFile); } - private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) { + private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { + if (info.isPresent()) { + return Mono.just(info.get()); + } else { + // If there is no info, the file is not retrievable + return Mono.error(new DatafileTaskException("Non retryable file transfer failure")); + } + } + + private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -88,11 +99,19 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile, context)); - } catch (Exception throwable) { + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); + } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + e.toString()); + if (e.isRetryable()) { + return Mono.error(e); + } else { + return Mono.just(Optional.empty()); // Give up + } + } catch (Exception throwable) { + logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); - return Mono.error(throwable); + return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -107,7 +126,8 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 5cc894c3..26d47b51 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -202,8 +202,9 @@ public class ScheduledTasks { private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { MDC.setContextMap(fileData.context); - return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, - FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context) + return createFileCollector() // + .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, + fileData.context) // .onErrorResume(exception -> handleFetchFileFailure(fileData)); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 1a9d6699..cad3486d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -214,6 +214,23 @@ public class FileCollectorTest { } @Test + public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); + + final boolean retry = false; + FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); + doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + + StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) + .expectErrorMessage("Non retryable file transfer failure") // + .verify(); + + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + } + + @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); |