aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
commitbe8fa8158899180fccc753cf6690514bd9fcdb6a (patch)
tree665b1d907998901557d72e81c1c0cfb8c634020a /datafile-dmaap-client/src/main
parentd9a495306410ea3dc4b9fbfc8e1e99fd32dd77f6 (diff)
Running of file collection in paralell
Each FileReady message is run in a separate thread to increase the thoughput. Fetching of files from PNFs is retryed by using the reactive framework. Robustness to temporary failures is increased by retrying to publish fetched files. Fixed so that well known ports (FTPS/SFTP) are used if omitted in the FileReady message URL. Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java47
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java15
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java35
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java48
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java258
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java94
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java7
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java78
18 files changed, 253 insertions, 416 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
deleted file mode 100644
index c62f349b..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ErrorData {
- private List<String> errorMessages = new ArrayList<>();
- private List<Throwable> errorCauses = new ArrayList<>();
-
- public void addError(String errorMessage, Throwable errorCause) {
- errorMessages.add(errorMessage);
- errorCauses.add(errorCause);
- }
-
- @Override
- public String toString() {
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < errorMessages.size(); i++) {
- message.append(errorMessages.get(i));
- if (errorCauses.get(i) != null) {
- message.append(" Cause: ").append(errorCauses.get(i));
- }
- if (i < errorMessages.size() -1) {
- message.append("\n");
- }
- }
- return message.toString();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 4b7cc01a..29160c94 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,11 +18,10 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
-
import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class FTPSClientWrapper implements IFTPSClient {
private FTPSClient ftpsClient = new FTPSClient();
@@ -88,8 +87,14 @@ public class FTPSClientWrapper implements IFTPSClient {
}
@Override
- public boolean retrieveFile(String remote, OutputStream local) throws IOException {
- return ftpsClient.retrieveFile(remote, local);
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
+ try {
+ if (!ftpsClient.retrieveFile(remote, local)) {
+ throw new DatafileTaskException("could not retrieve file");
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException(e);
+ }
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index 42addbf8..f330b673 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,37 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public abstract class FileCollectClient {
- protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
-
- protected FileServerData fileServerData;
- protected String remoteFile;
- protected String localFile;
- protected ErrorData errorData;
-
- public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
- logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
- remoteFile, localFile);
-
- this.fileServerData = fileServerData;
- this.remoteFile = remoteFile;
- this.localFile = localFile;
-
- return retryCollectFile();
- }
-
- public abstract FileCollectResult retryCollectFile();
-
- protected void addError(String errorMessage, Throwable errorCause) {
- if (errorData == null) {
- errorData = new ErrorData();
- }
- errorData.addError(errorMessage, errorCause);
- }
+public interface FileCollectClient {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
deleted file mode 100644
index fa1d4310..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-public class FileCollectResult {
- private boolean result;
- private ErrorData errorData;
-
- public FileCollectResult() {
- this.result = true;
- }
-
- public FileCollectResult(ErrorData errorData) {
- this.errorData = errorData;
- result = false;
- }
-
- public boolean downloadSuccessful() {
- return result;
- }
-
- public String getErrorData() {
- if (errorData != null) {
- return errorData.toString();
- }
- return "";
- }
-
- @Override
- public String toString() {
- return "FileCollectResult: "
- + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index d4eca4d7..b080c320 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
+import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -27,5 +29,5 @@ public interface FileServerData {
public String serverAddress();
public String userId();
public String password();
- public int port();
+ public Optional<Integer> port();
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 0d055fc1..461b2200 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
-import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -37,118 +40,106 @@ import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper;
-import org.springframework.stereotype.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
-@Component
-public class FtpsClient extends FileCollectClient {
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
private String keyCertPath;
private String keyCertPassword;
- private String trustedCAPath;
+ private Path trustedCAPath;
private String trustedCAPassword;
- private IFTPSClient realFtpsClient;
- private IKeyManagerUtils kmu;
+ private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+ private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
private IKeyStore keyStore;
private ITrustManagerFactory trustManagerFactory;
- private IFile lf;
- private IFileSystemResource fileResource;
- private IOutputStream os;
+ private IFile localFile = new FileWrapper();
+ private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
+ private IOutputStream outputStream;
private boolean keyManagerSet = false;
private boolean trustManagerSet = false;
+ private final FileServerData fileServerData;
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
-
- FileCollectResult fileCollectResult;
- IFTPSClient ftps = getFtpsClient();
+ public FtpsClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- ftps.setNeedClientAuth(true);
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
- if (getFileFromxNF(ftps)) {
- fileCollectResult = new FileCollectResult();
- } else {
- fileCollectResult = new FileCollectResult(errorData);
- }
- } else {
- fileCollectResult = new FileCollectResult(errorData);
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ setUpKeyManager(realFtpsClient);
+ setUpTrustedCA(realFtpsClient);
+ setUpConnection(realFtpsClient);
+ getFileFromxNF(realFtpsClient, remoteFile, localFile);
+ } catch (IOException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException("Could not open connection: " + e);
+ } catch (KeyManagerException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException(e);
+ } finally {
+ closeDownConnection(realFtpsClient);
}
- closeDownConnection(ftps);
- logger.trace("retryCollectFile left with result: {}", fileCollectResult);
- return fileCollectResult;
+ logger.trace("collectFile fetched: {}", localFile);
}
- private boolean setUpKeyManager(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
if (keyManagerSet) {
logger.trace("keyManager already set!");
- return result;
- }
- try {
- IKeyManagerUtils keyManagerUtils = getKeyManagerUtils();
+ } else {
keyManagerUtils.setCredentials(keyCertPath, keyCertPassword);
ftps.setKeyManager(keyManagerUtils.getClientKeyManager());
keyManagerSet = true;
- } catch (KeyManagerException e) {
- addError("Unable to use own key store " + keyCertPath, e);
- result = false;
}
logger.trace("complete setUpKeyManager");
- return result;
}
- private boolean setUpTrustedCA(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
if (trustManagerSet) {
logger.trace("trustManager already set!");
- return result;
- }
- try {
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(trustedCAPath);
- InputStream fis = fileSystemResource.getInputStream();
- IKeyStore ks = getKeyStore();
- ks.load(fis, trustedCAPassword.toCharArray());
- fis.close();
- ITrustManagerFactory tmf = getTrustManagerFactory();
- tmf.init(ks.getKeyStore());
- ftps.setTrustManager(tmf.getTrustManagers()[0]);
- trustManagerSet = true;
-
- } catch (Exception e) {
- addError("Unable to trust xNF's CA, " + trustedCAPath, e);
- result = false;
+ } else {
+ try {
+ fileSystemResource.setPath(trustedCAPath);
+ InputStream fis = fileSystemResource.getInputStream();
+ IKeyStore ks = getKeyStore();
+ ks.load(fis, trustedCAPassword.toCharArray());
+ fis.close();
+ ITrustManagerFactory tmf = getTrustManagerFactory();
+ tmf.init(ks.getKeyStore());
+ ftps.setTrustManager(tmf.getTrustManagers()[0]);
+ trustManagerSet = true;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e);
+ }
}
logger.trace("complete setUpTrustedCA");
- return result;
}
- private boolean setUpConnection(IFTPSClient ftps) {
- boolean result = true;
- try {
- if (ftps.isConnected()) {
- addError(
- "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
- null);
- return false;
- }
- ftps.connect(fileServerData.serverAddress(), fileServerData.port());
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 21;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+ if (!ftps.isConnected()) {
+ ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
logger.trace("after ftp connect");
- boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
- if (!loginSuccesful) {
- closeDownConnection(ftps);
- addError("Unable to log in to xNF. " + fileServerData, null);
- return false;
+
+ if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
}
- if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
+ if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
ftps.enterLocalPassiveMode();
ftps.setFileType(FTP.BINARY_FILE_TYPE);
// Set protection buffer size
@@ -157,54 +148,29 @@ public class FtpsClient extends FileCollectClient {
ftps.execPROT("P");
ftps.setBufferSize(1024 * 1024);
} else {
- closeDownConnection(ftps);
- addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
- null);
- return false;
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + ftps.getReplyCode());
}
- } catch (Exception e) {
- logger.trace("connect to ftp server failed.", e);
- addError("Unable to connect to xNF. Data: " + fileServerData, e);
- closeDownConnection(ftps);
- return false;
}
logger.trace("setUpConnection successfully!");
- return result;
}
- private boolean getFileFromxNF(IFTPSClient ftps) {
+ private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
+ throws IOException, DatafileTaskException {
logger.trace("starting to getFile");
- boolean result = true;
- IFile outfile = getFile();
- try {
- outfile.setPath(localFile);
- outfile.createNewFile();
-
- IOutputStream outputStream = getOutputStream();
- OutputStream output = outputStream.getOutputStream(outfile.getFile());
- logger.trace("begin to retrieve from xNF.");
- result = ftps.retrieveFile(remoteFile, output);
- logger.trace("end retrieve from xNF.");
- if (!result) {
- output.close();
- logger.debug("Unable to retrieve file from xNF. Cause unknown!");
- addError("Unable to retrieve file from xNF. Cause unknown!", null);
- return result;
- }
- output.close();
- logger.debug("File {} Download Successfull from xNF", localFile);
- } catch (IOException ex) {
- addError("Unable to collect file from xNF. Data: " + fileServerData, ex);
- try {
- outfile.delete();
- } catch (Exception e) {
- logger.trace("Unable to delete file {}.", localFile, e);
- }
- return false;
- }
- return result;
+
+ this.localFile.setPath(localFileName);
+ this.localFile.createNewFile();
+
+ OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+ logger.trace("begin to retrieve from xNF.");
+ ftps.retrieveFile(remoteFileName, output);
+ logger.trace("end retrieve from xNF.");
+ output.close();
+ logger.debug("File {} Download Successfull from xNF", localFileName);
}
+
private void closeDownConnection(IFTPSClient ftps) {
logger.trace("starting to closeDownConnection");
if (ftps != null && ftps.isConnected()) {
@@ -232,7 +198,7 @@ public class FtpsClient extends FileCollectClient {
}
public void setTrustedCAPath(String trustedCAPath) {
- this.trustedCAPath = trustedCAPath;
+ this.trustedCAPath = Paths.get(trustedCAPath);
}
public void setTrustedCAPassword(String trustedCAPassword) {
@@ -246,21 +212,6 @@ public class FtpsClient extends FileCollectClient {
return trustManagerFactory;
}
- private IFTPSClient getFtpsClient() {
- if (realFtpsClient == null) {
- realFtpsClient = new FTPSClientWrapper();
- }
- return realFtpsClient;
- }
-
- private IKeyManagerUtils getKeyManagerUtils() {
- if (kmu == null) {
- kmu = new KeyManagerUtilsWrapper();
- }
-
- return kmu;
- }
-
private IKeyStore getKeyStore() throws KeyStoreException {
if (keyStore == null) {
keyStore = new KeyStoreWrapper();
@@ -269,54 +220,31 @@ public class FtpsClient extends FileCollectClient {
return keyStore;
}
- private IFile getFile() {
- if (lf == null) {
- lf = new FileWrapper();
- }
-
- return lf;
- }
-
- private IOutputStream getOutputStream() {
- if (os == null) {
- os = new OutputStreamWrapper();
- }
-
- return os;
- }
-
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFtpsClient(IFTPSClient ftpsClient) {
+ void setFtpsClient(IFTPSClient ftpsClient) {
this.realFtpsClient = ftpsClient;
}
- protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
- this.kmu = keyManagerUtils;
+ void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
+ this.keyManagerUtils = keyManagerUtils;
}
- protected void setKeyStore(IKeyStore keyStore) {
+ void setKeyStore(IKeyStore keyStore) {
this.keyStore = keyStore;
}
- protected void setTrustManagerFactory(ITrustManagerFactory tmf) {
+ void setTrustManagerFactory(ITrustManagerFactory tmf) {
trustManagerFactory = tmf;
}
- protected void setFile(IFile file) {
- lf = file;
+ void setFile(IFile file) {
+ localFile = file;
}
- protected void setOutputStream(IOutputStream outputStream) {
- os = outputStream;
+ void setOutputStream(IOutputStream outputStream) {
+ this.outputStream = outputStream;
}
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ this.fileSystemResource = fileSystemResource;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index 1a581636..3dcaa656 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,9 +18,9 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public interface IFTPSClient {
public void setNeedClientAuth(boolean isNeedClientAuth);
@@ -51,7 +51,7 @@ public interface IFTPSClient {
public void execPROT(String prot) throws IOException;
- public boolean retrieveFile(String remote, OutputStream local) throws IOException;
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
void setTimeout(Integer t);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..d469da66
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException("DFC does not support protocol " + schemeString
+ + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e8fc695a..0c6491b8 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,10 +21,13 @@ import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-import org.apache.commons.io.FilenameUtils;
-import org.springframework.stereotype.Component;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with SFTP protocol.
@@ -32,65 +35,52 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*
*/
-@Component
-public class SftpClient extends FileCollectClient {
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+ private final FileServerData fileServerData;
- FileCollectResult result;
- Session session = setUpSession(fileServerData);
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- if (session != null) {
- ChannelSftp sftpChannel = getChannel(session, fileServerData);
- if (sftpChannel != null) {
- try {
- sftpChannel.get(remoteFile, localFile);
- result = new FileCollectResult();
- logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile));
- } catch (SftpException e) {
- addError("Unable to get file from xNF. Data: " + fileServerData, e);
- result = new FileCollectResult(errorData);
- }
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- sftpChannel.exit();
- } else {
- result = new FileCollectResult(errorData);
- }
+ try {
+ Session session = setUpSession(fileServerData);
+ ChannelSftp sftpChannel = getChannel(session);
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ sftpChannel.exit();
session.disconnect();
- } else {
- result = new FileCollectResult(errorData);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e);
}
- logger.trace("retryCollectFile left with result: {}", result);
- return result;
+
+ logger.trace("collectFile OK");
+
}
- private Session setUpSession(FileServerData fileServerData) {
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 22;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
- Session session = null;
- try {
- session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port());
- session.setConfig("StrictHostKeyChecking", "no");
- session.setPassword(fileServerData.password());
- session.connect();
- } catch (JSchException e) {
- addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e);
- session = null;
- }
+ Session session =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ session.setConfig("StrictHostKeyChecking", "no");
+ session.setPassword(fileServerData.password());
+ session.connect();
return session;
}
- private ChannelSftp getChannel(Session session, FileServerData fileServerData) {
- ChannelSftp sftpChannel = null;
- try {
- Channel channel;
- channel = session.openChannel("sftp");
- channel.connect();
- sftpChannel = (ChannelSftp) channel;
- } catch (JSchException e) {
- addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e);
- }
- return sftpChannel;
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
index 95de2de8..5295b124 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
-
+import java.nio.file.Path;
import org.springframework.core.io.FileSystemResource;
public class FileSystemResourceWrapper implements IFileSystemResource {
private FileSystemResource realResource;
@Override
- public void setPath(String path) {
+ public void setPath(Path path) {
realResource = new FileSystemResource(path);
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
index 32b6c72f..203a5985 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,13 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public class FileWrapper implements IFile {
private File file;
@Override
- public void setPath(String path) {
- file = new File(path);
+ public void setPath(Path path) {
+ file = path.toFile();
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
index a7094f69..2b95842f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public interface IFile {
- public void setPath(String path);
+ public void setPath(Path path);
public boolean createNewFile() throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
index db303969..23f14a33 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,10 +18,11 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
public interface IFileSystemResource {
- public void setPath(String filePath);
+ public void setPath(Path filePath);
public InputStream getInputStream() throws IOException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
index 1ef790c0..8015ea76 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
index 830a571c..88787826 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 2e9c8488..e99b8114 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 2b44233f..1e1187ac 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b3c8c3ef..bced3d85 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -23,6 +23,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
@@ -36,17 +41,16 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Flux;
@@ -73,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
private final String user;
private final String pwd;
- private IFileSystemResource fileResource;
+ private IFileSystemResource fileResource = new FileSystemResourceWrapper();
private CloseableHttpAsyncClient webClient;
/**
@@ -97,10 +101,10 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR");
+ logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
webClient = getWebClient();
webClient.start();
@@ -114,20 +118,10 @@ public class DmaapProducerReactiveHttpClient {
HttpResponse response = future.get();
logger.trace(response.toString());
webClient.close();
- handleHttpResponse(response);
- return Flux.just(response.toString());
+ return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
- return Flux.empty();
- }
- }
-
- private void handleHttpResponse(HttpResponse response) {
- int statusCode = response.getStatusLine().getStatusCode();
- if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
- logger.trace("Publish to DR successful!");
- } else {
- logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Flux.error(e);
}
}
@@ -142,28 +136,20 @@ public class DmaapProducerReactiveHttpClient {
private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
- String fileLocation = model.getInternalLocation();
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(fileLocation);
- InputStream fileInputStream = null;
- try {
- fileInputStream = fileSystemResource.getInputStream();
- } catch (IOException e) {
- logger.error("Unable to get stream from filesystem.", e);
- }
- try {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- } catch (IOException e) {
- logger.error("Unable to set put request body from ByteArray.", e);
- }
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = Paths.get(model.getInternalLocation());
+ this.fileResource.setPath(fileLocation);
+ InputStream fileInputStream = fileResource.getInputStream();
+
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+
}
private URI getUri(String fileName) {
@@ -172,27 +158,19 @@ public class DmaapProducerReactiveHttpClient {
.path(path).build();
}
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
fileResource = fileSystemResource;
}
- protected CloseableHttpAsyncClient getWebClient() {
+ protected CloseableHttpAsyncClient getWebClient()
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
if (webClient != null) {
return webClient;
}
SSLContext sslContext = null;
- try {
- sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- } catch (Exception e) {
- logger.trace("Unable to get sslContext.", e);
- }
+
+ sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
//@formatter:off
return HttpAsyncClients.custom()
.setSSLContext(sslContext)
@@ -205,4 +183,4 @@ public class DmaapProducerReactiveHttpClient {
protected void setWebClient(CloseableHttpAsyncClient client) {
this.webClient = client;
}
-}
+} \ No newline at end of file