summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java30
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java83
3 files changed, 150 insertions, 26 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
new file mode 100644
index 00000000..c2b97da8
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+public class RetryTimer {
+ public void waitRetryTime() {
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // Nothing, no one will interrupt.
+ }
+
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
index be6ac9cc..306c2ded 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
@@ -22,6 +22,8 @@ import java.net.URI;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
@@ -51,6 +53,7 @@ public class XnfCollectorTaskImpl implements XnfCollectorTask {
private final FtpsClient ftpsClient;
private final SftpClient sftpClient;
+ private RetryTimer retryTimer;
@Autowired
protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
@@ -95,19 +98,20 @@ public class XnfCollectorTaskImpl implements XnfCollectorTask {
FileServerData fileServerData = getFileServerData(uri);
String remoteFile = uri.getPath();
String localFile = "target" + File.separator + fileData.name();
- String scheme = uri.getScheme();
- boolean fileDownloaded = false;
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile);
- } else if (SFTP.equals(scheme)) {
- fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- localFile = null;
- }
- if (!fileDownloaded) {
+ FileCollectClient currentClient = selectClient(fileData, uri);
+
+ if (currentClient != null) {
+ FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
+ if (!fileCollectResult.downloadSuccessful()) {
+ fileCollectResult = retry(fileCollectResult, currentClient);
+ }
+ if (!fileCollectResult.downloadSuccessful()) {
+ localFile = null;
+ logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
+ fileServerData, fileCollectResult.getErrorData());
+ }
+ } else {
localFile = null;
}
return localFile;
@@ -128,6 +132,30 @@ public class XnfCollectorTaskImpl implements XnfCollectorTask {
return userInfo;
}
+ private FileCollectClient selectClient(FileData fileData, URI uri) {
+ FileCollectClient selectedClient = null;
+ String scheme = uri.getScheme();
+ if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
+ selectedClient = ftpsClient;
+ } else if (SFTP.equals(scheme)) {
+ selectedClient = sftpClient;
+ } else {
+ logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
+ FTPES, FTPS, SFTP, fileData);
+ }
+ return selectedClient;
+ }
+
+ private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
+ int retryCount = 1;
+ FileCollectResult newResult = fileCollectResult;
+ while (!newResult.downloadSuccessful() && retryCount++ < 3) {
+ getRetryTimer().waitRetryTime();
+ newResult = fileCollectClient.retryCollectFile();
+ }
+ return newResult;
+ }
+
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
String name = fileData.name();
String compression = fileData.compression();
@@ -137,4 +165,15 @@ public class XnfCollectorTaskImpl implements XnfCollectorTask {
return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
.fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
}
+
+ private RetryTimer getRetryTimer() {
+ if (retryTimer == null) {
+ retryTimer = new RetryTimer();
+ }
+ return retryTimer;
+ }
+
+ protected void setRetryTimer(RetryTimer retryTimer) {
+ this.retryTimer = retryTimer;
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 3a3f16c0..8251a65a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -16,6 +16,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -28,6 +29,8 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
@@ -74,8 +77,8 @@ public class XnfCollectorTaskImplTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
+ private RetryTimer retryTimerMock = mock(RetryTimer.class);
- private XnfCollectorTask collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
@BeforeAll
public static void setUpConfiguration() {
@@ -88,6 +91,8 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenFtpesFile_returnCorrectResponse() {
+ XnfCollectorTaskImpl collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+
FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
.compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -96,7 +101,7 @@ public class XnfCollectorTaskImplTest {
FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
.password(PWD).port(PORT_22).build();
when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(Boolean.TRUE);
+ .thenReturn(new FileCollectResult());
ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
.location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
@@ -115,6 +120,8 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenSftpFile_returnCorrectResponse() {
+ XnfCollectorTaskImpl collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+
FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -123,7 +130,7 @@ public class XnfCollectorTaskImplTest {
FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
.password("").port(PORT_22).build();
when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(Boolean.TRUE);
+ .thenReturn(new FileCollectResult());
ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
.location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
@@ -133,15 +140,67 @@ public class XnfCollectorTaskImplTest {
.verifyComplete();
verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
- verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
- verify(ftpsClientMock).setTrustedCAPassword(TRUSTED_CA_PASSWORD);
- verifyNoMoreInteractions(ftpsClientMock);
+ verifyNoMoreInteractions(sftpClientMock);
+ }
+
+ @Test
+ public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() {
+ XnfCollectorTaskImpl collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ collectorUndetTest.setRetryTimer(retryTimerMock);
+
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
+ .password(PWD).port(PORT_22).build();
+ ErrorData errorData = new ErrorData();
+ errorData.addError("Unable to collect file.", new Exception());
+ when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(new FileCollectResult(errorData));
+ doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+
+ verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpsClientMock, times(2)).retryCollectFile();
+ }
+
+ @Test
+ public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() {
+ XnfCollectorTaskImpl collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ collectorUndetTest.setRetryTimer(retryTimerMock);
+
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
+ .password(PWD).port(PORT_22).build();
+ ErrorData errorData = new ErrorData();
+ errorData.addError("Unable to collect file.", new Exception());
+ when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(new FileCollectResult(errorData));
+ doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile();
+
+ ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+ .verifyComplete();
+
+
+ verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpsClientMock, times(1)).retryCollectFile();
}
@Test
public void whenWrongScheme_returnEmpty() {
+ XnfCollectorTaskImpl collectorUndetTest = new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+
FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location("http://host.com/file.zip")
.compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -150,14 +209,10 @@ public class XnfCollectorTaskImplTest {
FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
.password("").port(PORT_22).build();
when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(Boolean.TRUE);
+ .thenReturn(new FileCollectResult());
StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
- verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
- verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
- verify(ftpsClientMock).setTrustedCAPassword(TRUSTED_CA_PASSWORD);
- verifyNoMoreInteractions(ftpsClientMock);
+ verifyNoMoreInteractions(sftpClientMock);
}
}