aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java47
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java15
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java35
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java48
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java258
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java94
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java7
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java78
18 files changed, 253 insertions, 416 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
deleted file mode 100644
index c62f349b..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ErrorData {
- private List<String> errorMessages = new ArrayList<>();
- private List<Throwable> errorCauses = new ArrayList<>();
-
- public void addError(String errorMessage, Throwable errorCause) {
- errorMessages.add(errorMessage);
- errorCauses.add(errorCause);
- }
-
- @Override
- public String toString() {
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < errorMessages.size(); i++) {
- message.append(errorMessages.get(i));
- if (errorCauses.get(i) != null) {
- message.append(" Cause: ").append(errorCauses.get(i));
- }
- if (i < errorMessages.size() -1) {
- message.append("\n");
- }
- }
- return message.toString();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 4b7cc01a..29160c94 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,11 +18,10 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
-
import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class FTPSClientWrapper implements IFTPSClient {
private FTPSClient ftpsClient = new FTPSClient();
@@ -88,8 +87,14 @@ public class FTPSClientWrapper implements IFTPSClient {
}
@Override
- public boolean retrieveFile(String remote, OutputStream local) throws IOException {
- return ftpsClient.retrieveFile(remote, local);
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
+ try {
+ if (!ftpsClient.retrieveFile(remote, local)) {
+ throw new DatafileTaskException("could not retrieve file");
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException(e);
+ }
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index 42addbf8..f330b673 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,37 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public abstract class FileCollectClient {
- protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
-
- protected FileServerData fileServerData;
- protected String remoteFile;
- protected String localFile;
- protected ErrorData errorData;
-
- public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
- logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
- remoteFile, localFile);
-
- this.fileServerData = fileServerData;
- this.remoteFile = remoteFile;
- this.localFile = localFile;
-
- return retryCollectFile();
- }
-
- public abstract FileCollectResult retryCollectFile();
-
- protected void addError(String errorMessage, Throwable errorCause) {
- if (errorData == null) {
- errorData = new ErrorData();
- }
- errorData.addError(errorMessage, errorCause);
- }
+public interface FileCollectClient {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
deleted file mode 100644
index fa1d4310..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-public class FileCollectResult {
- private boolean result;
- private ErrorData errorData;
-
- public FileCollectResult() {
- this.result = true;
- }
-
- public FileCollectResult(ErrorData errorData) {
- this.errorData = errorData;
- result = false;
- }
-
- public boolean downloadSuccessful() {
- return result;
- }
-
- public String getErrorData() {
- if (errorData != null) {
- return errorData.toString();
- }
- return "";
- }
-
- @Override
- public String toString() {
- return "FileCollectResult: "
- + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index d4eca4d7..b080c320 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
+import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -27,5 +29,5 @@ public interface FileServerData {
public String serverAddress();
public String userId();
public String password();
- public int port();
+ public Optional<Integer> port();
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 0d055fc1..461b2200 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
-import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -37,118 +40,106 @@ import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper;
-import org.springframework.stereotype.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
-@Component
-public class FtpsClient extends FileCollectClient {
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
private String keyCertPath;
private String keyCertPassword;
- private String trustedCAPath;
+ private Path trustedCAPath;
private String trustedCAPassword;
- private IFTPSClient realFtpsClient;
- private IKeyManagerUtils kmu;
+ private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+ private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
private IKeyStore keyStore;
private ITrustManagerFactory trustManagerFactory;
- private IFile lf;
- private IFileSystemResource fileResource;
- private IOutputStream os;
+ private IFile localFile = new FileWrapper();
+ private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
+ private IOutputStream outputStream;
private boolean keyManagerSet = false;
private boolean trustManagerSet = false;
+ private final FileServerData fileServerData;
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
-
- FileCollectResult fileCollectResult;
- IFTPSClient ftps = getFtpsClient();
+ public FtpsClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- ftps.setNeedClientAuth(true);
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
- if (getFileFromxNF(ftps)) {
- fileCollectResult = new FileCollectResult();
- } else {
- fileCollectResult = new FileCollectResult(errorData);
- }
- } else {
- fileCollectResult = new FileCollectResult(errorData);
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ setUpKeyManager(realFtpsClient);
+ setUpTrustedCA(realFtpsClient);
+ setUpConnection(realFtpsClient);
+ getFileFromxNF(realFtpsClient, remoteFile, localFile);
+ } catch (IOException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException("Could not open connection: " + e);
+ } catch (KeyManagerException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException(e);
+ } finally {
+ closeDownConnection(realFtpsClient);
}
- closeDownConnection(ftps);
- logger.trace("retryCollectFile left with result: {}", fileCollectResult);
- return fileCollectResult;
+ logger.trace("collectFile fetched: {}", localFile);
}
- private boolean setUpKeyManager(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
if (keyManagerSet) {
logger.trace("keyManager already set!");
- return result;
- }
- try {
- IKeyManagerUtils keyManagerUtils = getKeyManagerUtils();
+ } else {
keyManagerUtils.setCredentials(keyCertPath, keyCertPassword);
ftps.setKeyManager(keyManagerUtils.getClientKeyManager());
keyManagerSet = true;
- } catch (KeyManagerException e) {
- addError("Unable to use own key store " + keyCertPath, e);
- result = false;
}
logger.trace("complete setUpKeyManager");
- return result;
}
- private boolean setUpTrustedCA(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
if (trustManagerSet) {
logger.trace("trustManager already set!");
- return result;
- }
- try {
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(trustedCAPath);
- InputStream fis = fileSystemResource.getInputStream();
- IKeyStore ks = getKeyStore();
- ks.load(fis, trustedCAPassword.toCharArray());
- fis.close();
- ITrustManagerFactory tmf = getTrustManagerFactory();
- tmf.init(ks.getKeyStore());
- ftps.setTrustManager(tmf.getTrustManagers()[0]);
- trustManagerSet = true;
-
- } catch (Exception e) {
- addError("Unable to trust xNF's CA, " + trustedCAPath, e);
- result = false;
+ } else {
+ try {
+ fileSystemResource.setPath(trustedCAPath);
+ InputStream fis = fileSystemResource.getInputStream();
+ IKeyStore ks = getKeyStore();
+ ks.load(fis, trustedCAPassword.toCharArray());
+ fis.close();
+ ITrustManagerFactory tmf = getTrustManagerFactory();
+ tmf.init(ks.getKeyStore());
+ ftps.setTrustManager(tmf.getTrustManagers()[0]);
+ trustManagerSet = true;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e);
+ }
}
logger.trace("complete setUpTrustedCA");
- return result;
}
- private boolean setUpConnection(IFTPSClient ftps) {
- boolean result = true;
- try {
- if (ftps.isConnected()) {
- addError(
- "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
- null);
- return false;
- }
- ftps.connect(fileServerData.serverAddress(), fileServerData.port());
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 21;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+ if (!ftps.isConnected()) {
+ ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
logger.trace("after ftp connect");
- boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
- if (!loginSuccesful) {
- closeDownConnection(ftps);
- addError("Unable to log in to xNF. " + fileServerData, null);
- return false;
+
+ if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
}
- if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
+ if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
ftps.enterLocalPassiveMode();
ftps.setFileType(FTP.BINARY_FILE_TYPE);
// Set protection buffer size
@@ -157,54 +148,29 @@ public class FtpsClient extends FileCollectClient {
ftps.execPROT("P");
ftps.setBufferSize(1024 * 1024);
} else {
- closeDownConnection(ftps);
- addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
- null);
- return false;
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + ftps.getReplyCode());
}
- } catch (Exception e) {
- logger.trace("connect to ftp server failed.", e);
- addError("Unable to connect to xNF. Data: " + fileServerData, e);
- closeDownConnection(ftps);
- return false;
}
logger.trace("setUpConnection successfully!");
- return result;
}
- private boolean getFileFromxNF(IFTPSClient ftps) {
+ private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
+ throws IOException, DatafileTaskException {
logger.trace("starting to getFile");
- boolean result = true;
- IFile outfile = getFile();
- try {
- outfile.setPath(localFile);
- outfile.createNewFile();
-
- IOutputStream outputStream = getOutputStream();
- OutputStream output = outputStream.getOutputStream(outfile.getFile());
- logger.trace("begin to retrieve from xNF.");
- result = ftps.retrieveFile(remoteFile, output);
- logger.trace("end retrieve from xNF.");
- if (!result) {
- output.close();
- logger.debug("Unable to retrieve file from xNF. Cause unknown!");
- addError("Unable to retrieve file from xNF. Cause unknown!", null);
- return result;
- }
- output.close();
- logger.debug("File {} Download Successfull from xNF", localFile);
- } catch (IOException ex) {
- addError("Unable to collect file from xNF. Data: " + fileServerData, ex);
- try {
- outfile.delete();
- } catch (Exception e) {
- logger.trace("Unable to delete file {}.", localFile, e);
- }
- return false;
- }
- return result;
+
+ this.localFile.setPath(localFileName);
+ this.localFile.createNewFile();
+
+ OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+ logger.trace("begin to retrieve from xNF.");
+ ftps.retrieveFile(remoteFileName, output);
+ logger.trace("end retrieve from xNF.");
+ output.close();
+ logger.debug("File {} Download Successfull from xNF", localFileName);
}
+
private void closeDownConnection(IFTPSClient ftps) {
logger.trace("starting to closeDownConnection");
if (ftps != null && ftps.isConnected()) {
@@ -232,7 +198,7 @@ public class FtpsClient extends FileCollectClient {
}
public void setTrustedCAPath(String trustedCAPath) {
- this.trustedCAPath = trustedCAPath;
+ this.trustedCAPath = Paths.get(trustedCAPath);
}
public void setTrustedCAPassword(String trustedCAPassword) {
@@ -246,21 +212,6 @@ public class FtpsClient extends FileCollectClient {
return trustManagerFactory;
}
- private IFTPSClient getFtpsClient() {
- if (realFtpsClient == null) {
- realFtpsClient = new FTPSClientWrapper();
- }
- return realFtpsClient;
- }
-
- private IKeyManagerUtils getKeyManagerUtils() {
- if (kmu == null) {
- kmu = new KeyManagerUtilsWrapper();
- }
-
- return kmu;
- }
-
private IKeyStore getKeyStore() throws KeyStoreException {
if (keyStore == null) {
keyStore = new KeyStoreWrapper();
@@ -269,54 +220,31 @@ public class FtpsClient extends FileCollectClient {
return keyStore;
}
- private IFile getFile() {
- if (lf == null) {
- lf = new FileWrapper();
- }
-
- return lf;
- }
-
- private IOutputStream getOutputStream() {
- if (os == null) {
- os = new OutputStreamWrapper();
- }
-
- return os;
- }
-
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFtpsClient(IFTPSClient ftpsClient) {
+ void setFtpsClient(IFTPSClient ftpsClient) {
this.realFtpsClient = ftpsClient;
}
- protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
- this.kmu = keyManagerUtils;
+ void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
+ this.keyManagerUtils = keyManagerUtils;
}
- protected void setKeyStore(IKeyStore keyStore) {
+ void setKeyStore(IKeyStore keyStore) {
this.keyStore = keyStore;
}
- protected void setTrustManagerFactory(ITrustManagerFactory tmf) {
+ void setTrustManagerFactory(ITrustManagerFactory tmf) {
trustManagerFactory = tmf;
}
- protected void setFile(IFile file) {
- lf = file;
+ void setFile(IFile file) {
+ localFile = file;
}
- protected void setOutputStream(IOutputStream outputStream) {
- os = outputStream;
+ void setOutputStream(IOutputStream outputStream) {
+ this.outputStream = outputStream;
}
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ this.fileSystemResource = fileSystemResource;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index 1a581636..3dcaa656 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,9 +18,9 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public interface IFTPSClient {
public void setNeedClientAuth(boolean isNeedClientAuth);
@@ -51,7 +51,7 @@ public interface IFTPSClient {
public void execPROT(String prot) throws IOException;
- public boolean retrieveFile(String remote, OutputStream local) throws IOException;
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
void setTimeout(Integer t);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..d469da66
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException("DFC does not support protocol " + schemeString
+ + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e8fc695a..0c6491b8 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,10 +21,13 @@ import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-import org.apache.commons.io.FilenameUtils;
-import org.springframework.stereotype.Component;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with SFTP protocol.
@@ -32,65 +35,52 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*
*/
-@Component
-public class SftpClient extends FileCollectClient {
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+ private final FileServerData fileServerData;
- FileCollectResult result;
- Session session = setUpSession(fileServerData);
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- if (session != null) {
- ChannelSftp sftpChannel = getChannel(session, fileServerData);
- if (sftpChannel != null) {
- try {
- sftpChannel.get(remoteFile, localFile);
- result = new FileCollectResult();
- logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile));
- } catch (SftpException e) {
- addError("Unable to get file from xNF. Data: " + fileServerData, e);
- result = new FileCollectResult(errorData);
- }
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- sftpChannel.exit();
- } else {
- result = new FileCollectResult(errorData);
- }
+ try {
+ Session session = setUpSession(fileServerData);
+ ChannelSftp sftpChannel = getChannel(session);
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ sftpChannel.exit();
session.disconnect();
- } else {
- result = new FileCollectResult(errorData);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e);
}
- logger.trace("retryCollectFile left with result: {}", result);
- return result;
+
+ logger.trace("collectFile OK");
+
}
- private Session setUpSession(FileServerData fileServerData) {
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 22;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
- Session session = null;
- try {
- session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port());
- session.setConfig("StrictHostKeyChecking", "no");
- session.setPassword(fileServerData.password());
- session.connect();
- } catch (JSchException e) {
- addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e);
- session = null;
- }
+ Session session =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ session.setConfig("StrictHostKeyChecking", "no");
+ session.setPassword(fileServerData.password());
+ session.connect();
return session;
}
- private ChannelSftp getChannel(Session session, FileServerData fileServerData) {
- ChannelSftp sftpChannel = null;
- try {
- Channel channel;
- channel = session.openChannel("sftp");
- channel.connect();
- sftpChannel = (ChannelSftp) channel;
- } catch (JSchException e) {
- addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e);
- }
- return sftpChannel;
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
index 95de2de8..5295b124 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
-
+import java.nio.file.Path;
import org.springframework.core.io.FileSystemResource;
public class FileSystemResourceWrapper implements IFileSystemResource {
private FileSystemResource realResource;
@Override
- public void setPath(String path) {
+ public void setPath(Path path) {
realResource = new FileSystemResource(path);
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
index 32b6c72f..203a5985 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,13 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public class FileWrapper implements IFile {
private File file;
@Override
- public void setPath(String path) {
- file = new File(path);
+ public void setPath(Path path) {
+ file = path.toFile();
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
index a7094f69..2b95842f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public interface IFile {
- public void setPath(String path);
+ public void setPath(Path path);
public boolean createNewFile() throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
index db303969..23f14a33 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,10 +18,11 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
public interface IFileSystemResource {
- public void setPath(String filePath);
+ public void setPath(Path filePath);
public InputStream getInputStream() throws IOException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
index 1ef790c0..8015ea76 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
index 830a571c..88787826 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 2e9c8488..e99b8114 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 2b44233f..1e1187ac 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b3c8c3ef..bced3d85 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -23,6 +23,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
@@ -36,17 +41,16 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Flux;
@@ -73,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
private final String user;
private final String pwd;
- private IFileSystemResource fileResource;
+ private IFileSystemResource fileResource = new FileSystemResourceWrapper();
private CloseableHttpAsyncClient webClient;
/**
@@ -97,10 +101,10 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR");
+ logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
webClient = getWebClient();
webClient.start();
@@ -114,20 +118,10 @@ public class DmaapProducerReactiveHttpClient {
HttpResponse response = future.get();
logger.trace(response.toString());
webClient.close();
- handleHttpResponse(response);
- return Flux.just(response.toString());
+ return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
- return Flux.empty();
- }
- }
-
- private void handleHttpResponse(HttpResponse response) {
- int statusCode = response.getStatusLine().getStatusCode();
- if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
- logger.trace("Publish to DR successful!");
- } else {
- logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Flux.error(e);
}
}
@@ -142,28 +136,20 @@ public class DmaapProducerReactiveHttpClient {
private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
- String fileLocation = model.getInternalLocation();
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(fileLocation);
- InputStream fileInputStream = null;
- try {
- fileInputStream = fileSystemResource.getInputStream();
- } catch (IOException e) {
- logger.error("Unable to get stream from filesystem.", e);
- }
- try {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- } catch (IOException e) {
- logger.error("Unable to set put request body from ByteArray.", e);
- }
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = Paths.get(model.getInternalLocation());
+ this.fileResource.setPath(fileLocation);
+ InputStream fileInputStream = fileResource.getInputStream();
+
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+
}
private URI getUri(String fileName) {
@@ -172,27 +158,19 @@ public class DmaapProducerReactiveHttpClient {
.path(path).build();
}
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
fileResource = fileSystemResource;
}
- protected CloseableHttpAsyncClient getWebClient() {
+ protected CloseableHttpAsyncClient getWebClient()
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
if (webClient != null) {
return webClient;
}
SSLContext sslContext = null;
- try {
- sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- } catch (Exception e) {
- logger.trace("Unable to get sslContext.", e);
- }
+
+ sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
//@formatter:off
return HttpAsyncClients.custom()
.setSSLContext(sslContext)
@@ -205,4 +183,4 @@ public class DmaapProducerReactiveHttpClient {
protected void setWebClient(CloseableHttpAsyncClient client) {
this.webClient = client;
}
-}
+} \ No newline at end of file