summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/config/application.yaml1
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java12
-rw-r--r--datafile-dmaap-client/pom.xml9
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java31
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java19
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java54
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java22
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java119
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java58
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java65
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java45
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java130
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java71
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java55
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java34
-rw-r--r--pom.xml12
16 files changed, 400 insertions, 337 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index cef185c6..b66f7b6e 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -14,6 +14,7 @@ logging:
ROOT: ERROR
org.springframework: ERROR
org.springframework.data: ERROR
+ org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.dcaegen2.collectors.datafile: ERROR
file: opt/log/application.log
app:
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 171dd024..c465fe94 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -59,11 +60,18 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
-
+ //@formatter:off
consumeFromDmaapMessage()
+ .publishOn(Schedulers.parallel())
+ .cache()
.doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+ .flatMap(this::collectFilesFromXnf)
+ .retry(3)
+ .cache()
+ .flatMap(this::publishToDmaapConfiguration)
+ .retry(3)
.subscribe(this::onSuccess, this::onError, this::onComplete);
+ //@formatter:on
}
private void onComplete() {
diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml
index 52394ad5..c863f6a0 100644
--- a/datafile-dmaap-client/pom.xml
+++ b/datafile-dmaap-client/pom.xml
@@ -37,9 +37,16 @@
</properties>
<dependencies>
-
<!-- DEVELOPMENT DEPENDENCIES -->
<dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
<artifactId>datafile-commons</artifactId>
<version>${project.parent.version}</version>
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 2ccf1bab..4b7cc01a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -2,17 +2,15 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
@@ -75,6 +73,11 @@ public class FTPSClientWrapper implements IFTPSClient {
}
@Override
+ public void setFileType(int fileType) throws IOException {
+ ftpsClient.setFileType(fileType);
+ }
+
+ @Override
public void execPBSZ(int psbz) throws IOException {
ftpsClient.execPBSZ(psbz);
}
@@ -93,4 +96,14 @@ public class FTPSClientWrapper implements IFTPSClient {
public void setTimeout(Integer t) {
this.ftpsClient.setDefaultTimeout(t);
}
+
+ @Override
+ public boolean isConnected() {
+ return ftpsClient.isConnected();
+ }
+
+ @Override
+ public void setBufferSize(int bufSize) {
+ ftpsClient.setBufferSize(bufSize);
+ }
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
index c5962172..fa1d4310 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
@@ -2,17 +2,15 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
@@ -44,6 +42,7 @@ public class FileCollectResult {
@Override
public String toString() {
- return "Download successful: " + result + " Error data: " + getErrorData();
+ return "FileCollectResult: "
+ + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 120868c6..49b7b665 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -23,6 +23,7 @@ import java.security.GeneralSecurityException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
@@ -72,7 +73,6 @@ public class FtpsClient extends FileCollectClient {
if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
if (getFileFromxNF(ftps)) {
- closeDownConnection(ftps);
fileCollectResult = new FileCollectResult();
} else {
fileCollectResult = new FileCollectResult(errorData);
@@ -80,6 +80,7 @@ public class FtpsClient extends FileCollectClient {
} else {
fileCollectResult = new FileCollectResult(errorData);
}
+ closeDownConnection(ftps);
logger.trace("retryCollectFile left with result: {}", fileCollectResult);
return fileCollectResult;
}
@@ -87,6 +88,7 @@ public class FtpsClient extends FileCollectClient {
private boolean setUpKeyManager(IFTPSClient ftps) {
boolean result = true;
if (keyManagerSet) {
+ logger.trace("keyManager already set!");
return result;
}
try {
@@ -105,6 +107,7 @@ public class FtpsClient extends FileCollectClient {
private boolean setUpTrustedCA(IFTPSClient ftps) {
boolean result = true;
if (trustManagerSet) {
+ logger.trace("trustManager already set!");
return result;
}
try {
@@ -130,32 +133,42 @@ public class FtpsClient extends FileCollectClient {
private boolean setUpConnection(IFTPSClient ftps) {
boolean result = true;
try {
+ if (ftps.isConnected()) {
+ addError(
+ "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
+ null);
+ return false;
+ }
ftps.connect(fileServerData.serverAddress(), fileServerData.port());
logger.trace("after ftp connect");
boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
if (!loginSuccesful) {
- ftps.logout();
+ closeDownConnection(ftps);
addError("Unable to log in to xNF. " + fileServerData, null);
- result = false;
+ return false;
}
if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
ftps.enterLocalPassiveMode();
+ ftps.setFileType(FTP.BINARY_FILE_TYPE);
// Set protection buffer size
ftps.execPBSZ(0);
// Set data channel protection to private
ftps.execPROT("P");
+ ftps.setBufferSize(1024 * 1024);
} else {
- ftps.disconnect();
+ closeDownConnection(ftps);
addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
null);
- result = false;
+ return false;
}
- } catch (Exception ex) {
- addError("Unable to connect to xNF. Data: " + fileServerData, ex);
- result = false;
+ } catch (Exception e) {
+ logger.trace("connect to ftp server failed.", e);
+ addError("Unable to connect to xNF. Data: " + fileServerData, e);
+ closeDownConnection(ftps);
+ return false;
}
- logger.trace("setUpConnection return value: {}", result);
+ logger.trace("setUpConnection successfully!");
return result;
}
@@ -169,8 +182,9 @@ public class FtpsClient extends FileCollectClient {
IOutputStream outputStream = getOutputStream();
OutputStream output = outputStream.getOutputStream(outfile.getFile());
-
+ logger.trace("begin to retrieve from xNF.");
result = ftps.retrieveFile(remoteFile, output);
+ logger.trace("end retrieve from xNF.");
if (!result) {
output.close();
logger.debug("Unable to retrieve file from xNF. Cause unknown!");
@@ -184,22 +198,28 @@ public class FtpsClient extends FileCollectClient {
try {
outfile.delete();
} catch (Exception e) {
- // Nothing
+ logger.error("Unable to close file. {}", e);
}
- result = false;
+ return false;
}
return result;
}
private void closeDownConnection(IFTPSClient ftps) {
logger.trace("starting to closeDownConnection");
- try {
- if (ftps != null) {
- ftps.logout();
+ if (ftps != null && ftps.isConnected()) {
+ try {
+ boolean logOut = ftps.logout();
+ logger.trace("logOut: {}", logOut);
+ } catch (Exception e) {
+ logger.trace("Unable to logout connection.", e);
+ }
+ try {
ftps.disconnect();
+ logger.trace("disconnected!");
+ } catch (Exception e) {
+ logger.trace("Unable to disconnect connection.", e);
}
- } catch (Exception e) {
- // Do nothing, file has been collected.
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index b1472026..1a581636 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -2,17 +2,15 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
@@ -39,10 +37,16 @@ public interface IFTPSClient {
public int getReplyCode();
+ public void setBufferSize(int bufSize);
+
+ public boolean isConnected();
+
public void disconnect() throws IOException;
public void enterLocalPassiveMode();
+ public void setFileType(int fileType) throws IOException;
+
public void execPBSZ(int newParam) throws IOException;
public void execPROT(String prot) throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b4c52693..6a2c6631 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -23,26 +23,29 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.Future;
+
+import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.ssl.SSLContextBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.onap.dcaegen2.collectors.datafile.web.RestTemplateWrapper;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Flux;
@@ -70,7 +73,7 @@ public class DmaapProducerReactiveHttpClient {
private final String pwd;
private IFileSystemResource fileResource;
- private IRestTemplate restTemplate;
+ private CloseableHttpAsyncClient webClient;
/**
* Constructor DmaapProducerReactiveHttpClient.
@@ -78,7 +81,6 @@ public class DmaapProducerReactiveHttpClient {
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
-
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
@@ -97,54 +99,70 @@ public class DmaapProducerReactiveHttpClient {
public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.parseMediaType(dmaapContentType));
- addMetaDataToHead(consumerDmaapModel, headers);
-
- addUserCredentialsToHead(headers);
-
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(consumerDmaapModel.getLocation());
- InputStream fileInputStream = fileSystemResource.getInputStream();
- HttpEntity<byte[]> request = addFileToRequest(fileInputStream, headers);
+ logger.trace("Starting to publish to DR");
+ webClient = getWebClient();
+ webClient.start();
- logger.trace("Starting to publish to DR");
- ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()),
- HttpMethod.PUT, request, String.class);
+ HttpPut put = new HttpPut();
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ addUserCredentialsToHead(put);
- return Flux.just(responseEntity.getStatusCode().toString());
+ Future<HttpResponse> future = webClient.execute(put, null);
+ HttpResponse response = future.get();
+ logger.trace(response.toString());
+ webClient.close();
+ handleHttpResponse(response);
+ return Flux.just(response.toString());
} catch (Exception e) {
logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
return Flux.empty();
}
}
- private void addUserCredentialsToHead(HttpHeaders headers) {
+ private void handleHttpResponse(HttpResponse response) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
+ logger.trace("Publish to DR successful!");
+ } else {
+ logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+ }
+ }
+
+ private void addUserCredentialsToHead(HttpPut put) {
String plainCreds = user + ":" + pwd;
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
logger.trace("base64Creds...: {}", base64Creds);
- headers.add("Authorization", "Basic " + base64Creds);
+ put.addHeader("Authorization", "Basic " + base64Creds);
}
- private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) {
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
- headers.set(X_ATT_DR_META, metaData.toString());
- }
- private HttpEntity<byte[]> addFileToRequest(InputStream inputStream, HttpHeaders headers)
- throws IOException {
- return new HttpEntity<>(IOUtils.toByteArray(inputStream), headers);
+ put.addHeader(X_ATT_DR_META, metaData.toString());
+ put.setURI(getUri(name));
}
- private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
- if (restTemplate == null) {
- restTemplate = new RestTemplateWrapper();
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
+ String fileLocation = model.getLocation();
+ IFileSystemResource fileSystemResource = getFileSystemResource();
+ fileSystemResource.setPath(fileLocation);
+ InputStream fileInputStream = null;
+ try {
+ fileInputStream = fileSystemResource.getInputStream();
+ } catch (IOException e) {
+ logger.error("Unable to get stream from filesystem.", e);
+ }
+ try {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ } catch (IOException e) {
+ logger.error("Unable to set put request body from ByteArray.", e);
}
- return restTemplate;
}
private URI getUri(String fileName) {
@@ -164,7 +182,26 @@ public class DmaapProducerReactiveHttpClient {
fileResource = fileSystemResource;
}
- protected void setRestTemplate(IRestTemplate restTemplate) {
- this.restTemplate = restTemplate;
+ protected CloseableHttpAsyncClient getWebClient() {
+ if (webClient != null) {
+ return webClient;
+ }
+ SSLContext sslContext = null;
+ try {
+ sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+ } catch (Exception e) {
+ logger.trace("Unable to get sslContext.", e);
+ }
+ //@formatter:off
+ return HttpAsyncClients.custom()
+ .setSSLContext(sslContext)
+ .setSSLHostnameVerifier(new NoopHostnameVerifier())
+ .setRedirectStrategy(PublishRedirectStrategy.INSTANCE)
+ .build();
+ //@formatter:on
+ }
+
+ protected void setWebClient(CloseableHttpAsyncClient client) {
+ this.webClient = client;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java
deleted file mode 100644
index 15d459f8..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-package org.onap.dcaegen2.collectors.datafile.web;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpRequestInterceptor;
-import org.springframework.http.client.ClientHttpResponse;
-
-public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor {
-
- private final Logger log = LoggerFactory.getLogger(this.getClass());
-
- @Override
- public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
- logRequest(request, body);
- ClientHttpResponse response = execution.execute(request, body);
- logResponse(response);
- return response;
- }
-
- private void logRequest(HttpRequest request, byte[] body) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("===========================request begin================================================");
- log.debug("URI : {}", request.getURI());
- log.debug("Method : {}", request.getMethod());
- log.debug("Headers : {}", request.getHeaders());
- log.debug("Request body: {}", new String(body, "UTF-8"));
- log.debug("==========================request end================================================");
- }
- }
-
- private void logResponse(ClientHttpResponse response) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("============================response begin==========================================");
- log.debug("Status code : {}", response.getStatusCode());
- log.debug("Status text : {}", response.getStatusText());
- log.debug("Headers : {}", response.getHeaders());
- log.debug("=======================response end=================================================");
- }
- }
-} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java
deleted file mode 100644
index a1b42848..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import java.net.URI;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collections;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * @author
- *
- */
-public class RestTemplateWrapper implements IRestTemplate {
- private RestTemplate restTemplate;
-
- public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
- SSLContext sslContext =
- new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- CloseableHttpClient httpClient =
- HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
- .setRedirectStrategy(new PublishRedirectStrategy()).build();
-
- HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
- requestFactory.setHttpClient(httpClient);
-
- restTemplate = new RestTemplate(requestFactory);
- restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor()));
-
- }
-
- @Override
- public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
- Class<String> responseType) {
- return restTemplate.exchange(url, method, requestEntity, responseType);
- }
-
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
new file mode 100644
index 00000000..0511ad22
--- /dev/null
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import org.junit.jupiter.api.Test;
+
+public class FileCollectResultTest {
+
+ @Test
+ public void successfulResult() {
+ FileCollectResult resultUnderTest = new FileCollectResult();
+ assertTrue(resultUnderTest.downloadSuccessful());
+ assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
+ }
+
+ @Test
+ public void unSuccessfulResult() {
+ ErrorData errorData = new ErrorData();
+ errorData.addError("Error", null);
+ errorData.addError("Null", new NullPointerException());
+ FileCollectResult resultUnderTest = new FileCollectResult(errorData);
+ assertFalse(resultUnderTest.downloadSuccessful());
+ assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(), resultUnderTest.toString());
+ }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index e5693d50..2d59ad51 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -36,6 +36,8 @@ import java.security.KeyStoreException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
@@ -88,7 +90,7 @@ public class FtpsClientTest {
clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
-}
+ }
@Test
public void collectFile_allOk() throws Exception {
@@ -103,6 +105,7 @@ public class FtpsClientTest {
OutputStream osMock = mock(OutputStream.class);
when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
+ when(ftpsClientMock.isConnected()).thenReturn(false, true);
ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
.userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -124,19 +127,22 @@ public class FtpsClientTest {
verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
verify(ftpsClientMock).execPBSZ(0);
verify(ftpsClientMock).execPROT("P");
+ verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+ verify(ftpsClientMock).setBufferSize(1024 * 1024);
verify(localFileMock).setPath(LOCAL_FILE_PATH);
verify(localFileMock, times(1)).createNewFile();
verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
verify(osMock, times(1)).close();
verify(ftpsClientMock, times(1)).logout();
verify(ftpsClientMock, times(1)).disconnect();
+ verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@Test
public void collectFileFaultyOwnKey_shouldFail() throws Exception {
- doThrow(new GeneralSecurityException())
- .when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doThrow(new GeneralSecurityException()).when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH,
+ FTP_KEY_PASSWORD);
ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
.userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -144,6 +150,10 @@ public class FtpsClientTest {
FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock, times(1)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -160,6 +170,15 @@ public class FtpsClientTest {
FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock, times(1)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -175,8 +194,19 @@ public class FtpsClientTest {
FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
- verify(ftpsClientMock, times(1)).logout();
assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock, times(3)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -186,15 +216,27 @@ public class FtpsClientTest {
when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock);
when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
- when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.BAD_REQUEST.value());
+ when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
.userId(USERNAME).password(PASSWORD).port(PORT).build();
FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
- verify(ftpsClientMock, times(1)).disconnect();
assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock, times(2)).getReplyCode();
+ verify(ftpsClientMock, times(3)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -212,6 +254,17 @@ public class FtpsClientTest {
FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock, times(3)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -232,5 +285,68 @@ public class FtpsClientTest {
assertFalse(result.downloadSuccessful());
verify(localFileMock, times(1)).delete();
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock).getReplyCode();
+ verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+ verify(ftpsClientMock).execPBSZ(0);
+ verify(ftpsClientMock).execPROT("P");
+ verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+ verify(ftpsClientMock).setBufferSize(1024 * 1024);
+ verify(localFileMock).setPath(LOCAL_FILE_PATH);
+ verify(localFileMock, times(1)).createNewFile();
+ verify(ftpsClientMock, times(2)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
}
-} \ No newline at end of file
+
+ @Test
+ public void collectFileFailingFileRetrieve_shouldFail() throws Exception {
+ when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock);
+ when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock);
+ when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock);
+ when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
+ when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
+ when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
+ File fileMock = mock(File.class);
+ when(localFileMock.getFile()).thenReturn(fileMock);
+ OutputStream osMock = mock(OutputStream.class);
+ when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
+ when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false);
+
+ ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password(PASSWORD).port(PORT).build();
+
+ FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+
+ assertFalse(result.downloadSuccessful());
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+ verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+ verify(inputStreamMock, times(1)).close();
+ verify(trustManagerFactoryMock).init(keyStoreMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock).getReplyCode();
+ verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+ verify(ftpsClientMock).execPBSZ(0);
+ verify(ftpsClientMock).execPROT("P");
+ verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+ verify(ftpsClientMock).setBufferSize(1024 * 1024);
+ verify(localFileMock).setPath(LOCAL_FILE_PATH);
+ verify(localFileMock, times(1)).createNewFile();
+ verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+ verify(ftpsClientMock, times(2)).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index ba424626..6cf7190c 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.service.producer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.gson.JsonElement;
@@ -30,9 +29,16 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -40,13 +46,8 @@ import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.springframework.http.HttpEntity;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.test.StepVerifier;
@@ -77,11 +78,12 @@ class DmaapProducerReactiveHttpClientTest {
private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
- private IRestTemplate restTemplateMock = mock(IRestTemplate.class);
- private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class);
+ private CloseableHttpAsyncClient clientMock;
+ private HttpResponse responseMock = mock(HttpResponse.class);
+ private Future<HttpResponse> futureMock = mock(Future.class);
+ private StatusLine statusLine = mock(StatusLine.class);
private InputStream fileStream;
-
@BeforeEach
void setUp() {
when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
@@ -94,47 +96,60 @@ class DmaapProducerReactiveHttpClientTest {
dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
- dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock);
+ clientMock = mock(CloseableHttpAsyncClient.class);
+ dmaapProducerReactiveHttpClient.setWebClient(clientMock);
}
@Test
void getHttpResponse_Success() throws Exception {
- mockWebClientDependantObject();
-
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
- .expectNext(HttpStatus.OK.toString()).verifyComplete();
-
+ mockWebClientDependantObject(true);
URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
.path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
- HttpHeaders headers = new HttpHeaders();
-
- headers.setContentType(MediaType.parseMediaType(APPLICATION_OCTET_STREAM_CONTENT_TYPE));
+ HttpPut httpPut = new HttpPut();
+ httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+
metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
- headers.set(X_ATT_DR_META, metaData.toString());
+ httpPut.addHeader(X_ATT_DR_META, metaData.toString());
+ httpPut.setURI(expectedUri);
String plainCreds = "dradmin" + ":" + "dradmin";
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
- headers.add("Authorization", "Basic " + base64Creds);
+ httpPut.addHeader("Authorization", "Basic " + base64Creds);
fileStream.reset();
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ .expectNext(responseMock.toString()).verifyComplete();
- HttpEntity<byte[]> requestEntity = new HttpEntity<>(IOUtils.toByteArray(fileStream), headers);
verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
- verify(restTemplateMock).exchange(expectedUri, HttpMethod.PUT, requestEntity, String.class);
- verifyNoMoreInteractions(restTemplateMock);
+ InputStream fileInputStream = fileSystemResourceMock.getInputStream();
+ httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+
+ @Test
+ void getHttpResponse_Fail() throws Exception {
+ mockWebClientDependantObject(false);
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ .verifyComplete();
}
- private void mockWebClientDependantObject() throws IOException {
+ private void mockWebClientDependantObject(boolean success)
+ throws IOException, InterruptedException, ExecutionException {
fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
-
- when(restTemplateMock.exchange(any(), any(), any(), any())).thenReturn(responseEntityMock);
- when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK);
+ if (success) {
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+ when(futureMock.get()).thenReturn(responseMock);
+ when(responseMock.getStatusLine()).thenReturn(statusLine);
+ when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+ } else {
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+ when(futureMock.get()).thenThrow(new InterruptedException());
+ }
}
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java
deleted file mode 100644
index b0f5c931..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-
-import org.junit.jupiter.api.Test;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpResponse;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-class RequestResponseLoggingInterceptorTest {
-
- @Test
- void intercept_shouldReturnObject() throws URISyntaxException, IOException {
-
- //given
- RequestResponseLoggingInterceptor requestResponseLoggingInterceptor = new RequestResponseLoggingInterceptor();
-
- ClientHttpRequestExecution execution = mock(ClientHttpRequestExecution.class);
- HttpRequest request = mock(HttpRequest.class);
- ClientHttpResponse response = mock(ClientHttpResponse.class);
-
- byte[] BODY = new byte[] { (byte)0xe0, 0x4f, (byte)0xd0, 0x20, (byte)0xa2 };
- URI uri = new URI("www.someuri.com");
-
- //when
- when(execution.execute(request, BODY)).thenReturn(response);
-
- //then
- assertNotNull(requestResponseLoggingInterceptor.intercept(request, BODY, execution));
- }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java
deleted file mode 100644
index 3a0701ff..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import org.junit.jupiter.api.Test;
-
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-class RestTemplateWrapperTest {
-
- @Test
- void constructor_shouldReturnNotNullObject() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
- RestTemplateWrapper restTemplateWrapper = new RestTemplateWrapper();
- assertNotNull(restTemplateWrapper);
- }
-}
diff --git a/pom.xml b/pom.xml
index 5bd941f5..03bd074a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,7 @@
<properties>
<java.version>8</java.version>
<immutable.version>2.7.1</immutable.version>
- <spring.version>5.1.0.RELEASE</spring.version>
+ <spring.version>5.1.2.RELEASE</spring.version>
<spring-boot.version>2.1.0.M4</spring-boot.version>
<tomcat.version>8.5.32</tomcat.version>
<docker.maven.version>1.0.0</docker.maven.version>
@@ -136,6 +136,16 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.1.4</version>
+ </dependency>
+ <dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-SR10</version>