summaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
commite1a66425d3ba1df5ae2a8f2b99168707e08b655a (patch)
treebdfcbdbe4a3dd7286a562f974dec61691e216848 /datafile-dmaap-client
parent4bd281390ed24b278846775c1157f82db81fddbe (diff)
Local filename updated, stability issues
The local filename is changed so it contains PNF name instead of the PNF IP address. The paralellism is restricted to 100 worker threads in order to solve problems with too many open file descriptors and out of memory. Logging is improved. Change-Id: I24ce2e23020cc253a3c7bebac1ab5cf703b5b144 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-dmaap-client')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java1
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java49
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java24
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java33
5 files changed, 56 insertions, 56 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index bca7dfd2..de50f24a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -25,4 +25,5 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
*/
public interface FileCollectClient extends AutoCloseable {
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+ public void open() throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 4283debf..492768c2 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
/**
- * Gets file from xNF with FTPS protocol.
+ * Gets file from PNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
@@ -53,12 +53,22 @@ public class FtpsClient implements FileCollectClient {
private final FileServerData fileServerData;
private static TrustManager theTrustManager = null;
- public FtpsClient(FileServerData fileServerData) {
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCAPath;
+ private final String trustedCAPassword;
+
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCAPath,
+ String trustedCAPassword) {
this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCAPath = trustedCAPath;
+ this.trustedCAPassword = trustedCAPassword;
}
- public void open(String keyCertPath, String keyCertPassword, Path trustedCAPath, String trustedCAPassword)
- throws DatafileTaskException {
+ @Override
+ public void open() throws DatafileTaskException {
try {
realFtpsClient.setNeedClientAuth(true);
realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
@@ -135,12 +145,24 @@ public class FtpsClient implements FileCollectClient {
logger.trace("setUpConnection successfully!");
}
- InputStream createInputStream(Path localFileName) throws IOException {
+ private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCAPath);
+ try (InputStream fis = createInputStream(trustedCAPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCAPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
FileSystemResource realResource = new FileSystemResource(localFileName);
return realResource.getInputStream();
}
- OutputStream createOutputStream(Path localFileName) throws IOException {
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
File localFile = localFileName.toFile();
if (localFile.createNewFile()) {
logger.warn("Local file {} already created", localFileName);
@@ -150,18 +172,7 @@ public class FtpsClient implements FileCollectClient {
return output;
}
- private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
- throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
- try (InputStream fis = createInputStream(trustedCAPath)) {
- KeyStore keyStore = KeyStore.getInstance("JKS");
- keyStore.load(fis, trustedCAPassword.toCharArray());
- TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
- factory.init(keyStore);
- return factory.getTrustManagers()[0];
- }
- }
-
- TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
+ protected TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
synchronized (FtpsClient.class) {
if (theTrustManager == null) {
@@ -171,7 +182,7 @@ public class FtpsClient implements FileCollectClient {
}
}
- KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
throws IOException, GeneralSecurityException {
return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index dec8af42..4517a755 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -47,7 +47,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
- logger.trace("collectFile called");
+ logger.trace("collectFile {}", localFile);
try {
sftpChannel.get(remoteFile, localFile.toString());
@@ -61,7 +61,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void close() {
- logger.trace("close");
+ logger.trace("closing sftp session");
if (sftpChannel != null) {
sftpChannel.exit();
sftpChannel = null;
@@ -72,6 +72,7 @@ public class SftpClient implements FileCollectClient {
}
}
+ @Override
public void open() throws DatafileTaskException {
try {
if (session == null) {
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9bd5d57f..944d3b34 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -57,11 +57,7 @@ public class DmaapProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final String dmaapHostName;
- private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
- private final String user;
- private final String pwd;
+ private final DmaapPublisherConfiguration configuration;
/**
* Constructor DmaapProducerReactiveHttpClient.
@@ -69,11 +65,7 @@ public class DmaapProducerReactiveHttpClient {
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
- this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.user = dmaapPublisherConfiguration.dmaapUserName();
- this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
+ this.configuration = dmaapPublisherConfiguration;
}
public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
@@ -85,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
logger.trace(INVOKE, "Starting to produce to DR {}", request);
Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
return response;
} catch (Exception e) {
throw new DatafileTaskException("Unable to create web client.", e);
@@ -101,7 +93,7 @@ public class DmaapProducerReactiveHttpClient {
logger.trace(INVOKE, "Starting to produce to DR {}", request);
Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
return response;
} catch (Exception e) {
throw new DatafileTaskException("Unable to create web client.", e);
@@ -109,7 +101,7 @@ public class DmaapProducerReactiveHttpClient {
}
public void addUserCredentialsToHead(HttpUriRequest request) {
- String plainCreds = user + ":" + pwd;
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
@@ -119,9 +111,9 @@ public class DmaapProducerReactiveHttpClient {
public UriBuilder getBaseUri() {
return new DefaultUriBuilderFactory().builder() //
- .scheme(dmaapProtocol) //
- .host(dmaapHostName) //
- .port(dmaapPortNumber);
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
}
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index e2882606..9e6c29f8 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -73,7 +73,8 @@ public class FtpsClientTest {
@BeforeEach
protected void setUp() throws Exception {
- clientUnderTestSpy = spy(new FtpsClient(createFileServerData()));
+ clientUnderTestSpy = spy(new FtpsClient(createFileServerData(), FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH,
+ TRUSTED_CA_PASSWORD));
clientUnderTestSpy.realFtpsClient = ftpsClientMock;
}
@@ -104,7 +105,7 @@ public class FtpsClientTest {
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
doReturn(true).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
@@ -124,9 +125,8 @@ public class FtpsClientTest {
public void collectFileFaultyOwnKey_shouldFail() throws Exception {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
- assertThatThrownBy(() -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH,
- TRUSTED_CA_PASSWORD)).hasMessageContaining(
- "Could not open connection: java.io.FileNotFoundException:");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessageContaining("Could not open connection: java.io.FileNotFoundException:");
verify(ftpsClientMock).setNeedClientAuth(true);
@@ -142,9 +142,8 @@ public class FtpsClientTest {
doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
doThrow(new IOException("problem")).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Could not open connection: java.io.IOException: problem");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.IOException: problem");
}
@Test
@@ -153,9 +152,8 @@ public class FtpsClientTest {
doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
doReturn(inputStreamMock).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Could not open connection: java.io.EOFException");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.EOFException");
}
@Test
@@ -166,9 +164,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(false).when(ftpsClientMock).login(USERNAME, PASSWORD);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Unable to log in to xNF. 127.0.0.1");
+ assertThatThrownBy(() -> clientUnderTestSpy.open()).hasMessage("Unable to log in to xNF. 127.0.0.1");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -185,9 +181,8 @@ public class FtpsClientTest {
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(503).when(ftpsClientMock).getReplyCode();
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -205,7 +200,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
@@ -224,7 +219,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
when(ftpsClientMock.isConnected()).thenReturn(false);
doThrow(new IOException("problem")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);