aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
commite1a66425d3ba1df5ae2a8f2b99168707e08b655a (patch)
treebdfcbdbe4a3dd7286a562f974dec61691e216848 /datafile-dmaap-client/src/main
parent4bd281390ed24b278846775c1157f82db81fddbe (diff)
Local filename updated, stability issues
The local filename is changed so it contains PNF name instead of the PNF IP address. The paralellism is restricted to 100 worker threads in order to solve problems with too many open file descriptors and out of memory. Logging is improved. Change-Id: I24ce2e23020cc253a3c7bebac1ab5cf703b5b144 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java1
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java49
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java24
4 files changed, 42 insertions, 37 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index bca7dfd2..de50f24a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -25,4 +25,5 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
*/
public interface FileCollectClient extends AutoCloseable {
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+ public void open() throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 4283debf..492768c2 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
/**
- * Gets file from xNF with FTPS protocol.
+ * Gets file from PNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
@@ -53,12 +53,22 @@ public class FtpsClient implements FileCollectClient {
private final FileServerData fileServerData;
private static TrustManager theTrustManager = null;
- public FtpsClient(FileServerData fileServerData) {
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCAPath;
+ private final String trustedCAPassword;
+
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCAPath,
+ String trustedCAPassword) {
this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCAPath = trustedCAPath;
+ this.trustedCAPassword = trustedCAPassword;
}
- public void open(String keyCertPath, String keyCertPassword, Path trustedCAPath, String trustedCAPassword)
- throws DatafileTaskException {
+ @Override
+ public void open() throws DatafileTaskException {
try {
realFtpsClient.setNeedClientAuth(true);
realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
@@ -135,12 +145,24 @@ public class FtpsClient implements FileCollectClient {
logger.trace("setUpConnection successfully!");
}
- InputStream createInputStream(Path localFileName) throws IOException {
+ private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCAPath);
+ try (InputStream fis = createInputStream(trustedCAPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCAPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
FileSystemResource realResource = new FileSystemResource(localFileName);
return realResource.getInputStream();
}
- OutputStream createOutputStream(Path localFileName) throws IOException {
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
File localFile = localFileName.toFile();
if (localFile.createNewFile()) {
logger.warn("Local file {} already created", localFileName);
@@ -150,18 +172,7 @@ public class FtpsClient implements FileCollectClient {
return output;
}
- private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
- throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
- try (InputStream fis = createInputStream(trustedCAPath)) {
- KeyStore keyStore = KeyStore.getInstance("JKS");
- keyStore.load(fis, trustedCAPassword.toCharArray());
- TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
- factory.init(keyStore);
- return factory.getTrustManagers()[0];
- }
- }
-
- TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
+ protected TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
synchronized (FtpsClient.class) {
if (theTrustManager == null) {
@@ -171,7 +182,7 @@ public class FtpsClient implements FileCollectClient {
}
}
- KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
throws IOException, GeneralSecurityException {
return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index dec8af42..4517a755 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -47,7 +47,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
- logger.trace("collectFile called");
+ logger.trace("collectFile {}", localFile);
try {
sftpChannel.get(remoteFile, localFile.toString());
@@ -61,7 +61,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void close() {
- logger.trace("close");
+ logger.trace("closing sftp session");
if (sftpChannel != null) {
sftpChannel.exit();
sftpChannel = null;
@@ -72,6 +72,7 @@ public class SftpClient implements FileCollectClient {
}
}
+ @Override
public void open() throws DatafileTaskException {
try {
if (session == null) {
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9bd5d57f..944d3b34 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -57,11 +57,7 @@ public class DmaapProducerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final String dmaapHostName;
- private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
- private final String user;
- private final String pwd;
+ private final DmaapPublisherConfiguration configuration;
/**
* Constructor DmaapProducerReactiveHttpClient.
@@ -69,11 +65,7 @@ public class DmaapProducerReactiveHttpClient {
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
- this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.user = dmaapPublisherConfiguration.dmaapUserName();
- this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
+ this.configuration = dmaapPublisherConfiguration;
}
public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
@@ -85,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
logger.trace(INVOKE, "Starting to produce to DR {}", request);
Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
return response;
} catch (Exception e) {
throw new DatafileTaskException("Unable to create web client.", e);
@@ -101,7 +93,7 @@ public class DmaapProducerReactiveHttpClient {
logger.trace(INVOKE, "Starting to produce to DR {}", request);
Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
return response;
} catch (Exception e) {
throw new DatafileTaskException("Unable to create web client.", e);
@@ -109,7 +101,7 @@ public class DmaapProducerReactiveHttpClient {
}
public void addUserCredentialsToHead(HttpUriRequest request) {
- String plainCreds = user + ":" + pwd;
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
@@ -119,9 +111,9 @@ public class DmaapProducerReactiveHttpClient {
public UriBuilder getBaseUri() {
return new DefaultUriBuilderFactory().builder() //
- .scheme(dmaapProtocol) //
- .host(dmaapHostName) //
- .port(dmaapPortNumber);
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
}
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)