aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKrzysztof Gajewski <krzysztof.gajewski@nokia.com>2020-12-15 11:19:51 +0100
committerKrzysztof Gajewski <krzysztof.gajewski@nokia.com>2020-12-30 11:51:40 +0100
commit42c23b6bfa5e55c8eb5be890de34b94e907ebe89 (patch)
tree87b3f080a6da1e3360c7ef7ceb3031793ca1f07d
parent4da3abb16bf2063a949f0bc4a48af2fac9c46ba5 (diff)
Add HTTP as new protocol to collect files from xNFs
- HTTP basic auth included - small code refactoring related to the task Issue-ID: DCAEGEN2-2527 Signed-off-by: Krzysztof Gajewski <krzysztof.gajewski@nokia.com> Change-Id: I13ec80e996861e14d2c561087c4af3b34d861030
-rw-r--r--datafile-app-server/pom.xml12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java)2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java)2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java)8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java151
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java14
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java1
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java145
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java170
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java6
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java)5
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java8
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java3
-rw-r--r--pom.xml10
22 files changed, 538 insertions, 26 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 62fdf485..950a3b92 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -167,6 +167,18 @@
</dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-bom</artifactId>
+ <version>${projectreactor.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<build>
<resources>
<resource>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java
index d74b10a2..517e3829 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java
@@ -14,7 +14,7 @@
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.commons;
import java.nio.file.Path;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java
index 72623db2..32241fdb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java
@@ -14,7 +14,7 @@
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.commons;
import java.util.Optional;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
index b20feb82..afa3aaea 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
@@ -17,7 +17,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.commons;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -28,10 +28,10 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
*
*/
public enum Scheme {
- FTPES, SFTP;
+ FTPES, SFTP, HTTP;
public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol ";
- public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS and sFTP";
+ public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS, sFTP and HTTP";
/**
* Get a <code>Scheme</code> from a string.
@@ -46,6 +46,8 @@ public enum Scheme {
result = Scheme.FTPES;
} else if ("SFTP".equalsIgnoreCase(schemeString)) {
result = Scheme.SFTP;
+ } else if ("HTTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.HTTP;
} else {
throw new DatafileTaskException(
DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
index a91d46ae..9bacec88 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
@@ -41,6 +41,8 @@ import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index d1685203..e972aa36 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -27,6 +27,8 @@ import java.nio.file.Path;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
new file mode 100644
index 00000000..86bfc210
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.resources.ConnectionProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class DfcHttpClient implements FileCollectClient {
+
+ //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+ private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
+ private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
+ private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
+
+ private final FileServerData fileServerData;
+ private Disposable disposableClient;
+
+ protected HttpClient client;
+
+ public DfcHttpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override public void open() throws DatafileTaskException {
+ logger.trace("Setting httpClient for file download.");
+
+ basicAuthDataPresentOrThrow();
+ this.client = HttpClient.create(pool).keepAlive(true).headers(
+ h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password())));
+
+ logger.trace("httpClient, auth header was set.");
+ }
+
+ private void basicAuthDataPresentOrThrow() throws DatafileTaskException {
+ if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) {
+ throw new DatafileTaskException("Not sufficient basic auth data for file.");
+ }
+ }
+
+ @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("Prepare to collectFile {}", localFile);
+ CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> errorMessage = new AtomicReference<>();
+
+ Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
+ Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
+
+ Flux<InputStream> responseContent = getServerResponse(remoteFile);
+ disposableClient = responseContent.subscribe(onSuccess, onError);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
+ }
+
+ if (isDownloadFailed(errorMessage)) {
+ throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get());
+ }
+
+ logger.trace("HTTP collectFile OK");
+ }
+
+ protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
+ return (errorMessage.get() != null);
+ }
+
+ @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) {
+ return (Throwable response) -> {
+ errorMessages.set(new Exception("Error in connection has occurred during file download", response));
+ latch.countDown();
+ };
+ }
+
+ @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
+ AtomicReference<Exception> errorMessages) {
+ return (InputStream response) -> {
+ logger.trace("Starting to process response.");
+ try {
+ long numBytes = Files.copy(response, localFile);
+ logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
+ logger.trace("CollectFile fetched: {}", localFile.toString());
+ response.close();
+ } catch (IOException e) {
+ errorMessages.set(new Exception("Error fetching file with", e));
+ } finally {
+ latch.countDown();
+ }
+ };
+ }
+
+ protected Flux<InputStream> getServerResponse(String remoteFile) {
+ return client.get()
+ .uri(prepareUri(remoteFile))
+ .response((responseReceiver, byteBufFlux) -> {
+ logger.trace("HTTP response status - {}", responseReceiver.status());
+ if(isResponseOk(responseReceiver)){
+ return byteBufFlux.aggregate().asInputStream();
+ }
+ return Mono.error(new Throwable("Unexpected server response code - "
+ + responseReceiver.status().toString()));
+ });
+ }
+
+ protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
+ return httpClientResponse.status().code() == 200;
+ }
+
+ @NotNull protected String prepareUri(String remoteFile) {
+ int port = fileServerData.port().isPresent() ? fileServerData.port().get() : HttpUtils.HTTP_DEFAULT_PORT;
+ return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile;
+ }
+
+ @Override public void close() {
+ logger.trace("Starting http client disposal.");
+ disposableClient.dispose();
+ logger.trace("Http client disposed.");
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 8721f61e..1c8f57da 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -27,9 +27,9 @@ import java.util.Optional;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 5371d485..1dca0058 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Modifications Copyright (C) 2020 Nokia. All rights reserved
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,12 +21,20 @@ package org.onap.dcaegen2.collectors.datafile.service;
import org.apache.http.HttpStatus;
+import java.util.Base64;
+
public final class HttpUtils implements HttpStatus {
+ public static final int HTTP_DEFAULT_PORT = 80;
+
private HttpUtils() {
}
public static boolean isSuccessfulResponseCode(Integer statusCode) {
return statusCode >= 200 && statusCode < 300;
}
+
+ public static String basicAuth(String username, String password) {
+ return "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 708865fa..42a0df16 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 4b89f169..e76d4156 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -27,10 +27,11 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -111,12 +112,11 @@ public class FileCollector {
counters.incNoOfFailedFtpAttempts();
return Mono.just(Optional.empty()); // Give up
} catch (DatafileTaskException e) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- e.toString());
+ logger.warn("Failed to download file: {} {}, reason: ", fileData.sourceName(), fileData.name(), e);
counters.incNoOfFailedFtpAttempts();
return Mono.error(e);
} catch (Exception throwable) {
- logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
@@ -128,6 +128,8 @@ public class FileCollector {
return createSftpClient(fileData);
case FTPES:
return createFtpesClient(fileData);
+ case HTTP:
+ return createHttpClient(fileData);
default:
throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
}
@@ -165,4 +167,8 @@ public class FileCollector {
return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert()), config.keyPasswordPath(),
Paths.get(config.trustedCa()), config.trustedCaPasswordPath());
}
+
+ protected FileCollectClient createHttpClient(FileData fileData) {
+ return new DfcHttpClient(fileData.fileServerData());
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
index 8e6ff947..d8daa56a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
@@ -42,6 +42,7 @@ import org.apache.commons.net.ftp.FTPSClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
import org.springframework.http.HttpStatus;
public class FtpesClientTest {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index d50bfc8d..49fd3652 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableSftpConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
new file mode 100644
index 00000000..f49cd391
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import reactor.core.publisher.Flux;
+import reactor.netty.http.client.HttpClientConfig;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DfcHttpClientTest {
+
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 80;
+
+ @Mock
+ private Path pathMock;
+
+ DfcHttpClient dfcHttpClientSpy;
+
+ private ImmutableFileServerData createFileServerData() {
+ return ImmutableFileServerData.builder()
+ .serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password(PASSWORD)
+ .port(PORT)
+ .build();
+ }
+
+ @BeforeEach
+ public void setup() {
+ dfcHttpClientSpy = spy(new DfcHttpClient(createFileServerData()));
+ }
+
+ @Test
+ public void openConnection_successAuthSetup() throws DatafileTaskException {
+ dfcHttpClientSpy.open();
+ HttpClientConfig config = dfcHttpClientSpy.client.configuration();
+ assertEquals(HttpUtils.basicAuth(USERNAME, PASSWORD), config.headers().get("Authorization"));
+ }
+
+ @Test
+ public void openConnection_failedBasicAuthSetupThrowException() {
+ ImmutableFileServerData serverData = ImmutableFileServerData.builder()
+ .serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password("")
+ .port(PORT)
+ .build();
+
+ DfcHttpClient dfcHttpClientSpy = spy(new DfcHttpClient(serverData));
+
+ assertThatThrownBy(() -> dfcHttpClientSpy.open())
+ .hasMessageContaining("Not sufficient basic auth data for file.");
+ }
+
+ @Test
+ public void prepareUri_UriWithoutPort() {
+ ImmutableFileServerData serverData = ImmutableFileServerData.builder()
+ .serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password(PASSWORD)
+ .build();
+ DfcHttpClient clientNoPortSpy = spy(new DfcHttpClient(serverData));
+ String REMOTE_FILE = "any";
+
+ String retrievedUri = clientNoPortSpy.prepareUri(REMOTE_FILE);
+ assertTrue(retrievedUri.startsWith("http://" + XNF_ADDRESS + ":80"));
+ }
+
+ @Test
+ public void collectFile_AllOk() throws Exception {
+ String REMOTE_FILE = "any";
+ Flux<InputStream> fis = Flux.just(new ByteArrayInputStream("ReturnedString".getBytes()));
+
+ dfcHttpClientSpy.open();
+
+ when(dfcHttpClientSpy.getServerResponse(any())).thenReturn(fis);
+ doReturn(false).when(dfcHttpClientSpy).isDownloadFailed(any());
+
+ dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock);
+ dfcHttpClientSpy.close();
+
+ verify(dfcHttpClientSpy, times(1)).getServerResponse(ArgumentMatchers.eq(REMOTE_FILE));
+ verify(dfcHttpClientSpy, times(1)).processDataFromServer(any(), any(), any());
+ verify(dfcHttpClientSpy, times(1)).isDownloadFailed(any());
+ }
+
+ @Test
+ public void collectFile_No200ResponseWriteToErrorMessage() throws DatafileTaskException {
+ String ERROR_RESPONSE = "This is unexpected message";
+ String REMOTE_FILE = "any";
+ Flux<Throwable> fis = Flux.error(new Throwable(ERROR_RESPONSE));
+
+ dfcHttpClientSpy.open();
+
+ doReturn(fis).when(dfcHttpClientSpy).getServerResponse(any());
+
+ assertThatThrownBy(() -> dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock))
+ .hasMessageContaining("Error occured during datafile download: ");
+ verify(dfcHttpClientSpy, times(1)).getServerResponse(REMOTE_FILE);
+ verify(dfcHttpClientSpy, times(1)).processFailedConnectionWithServer(any(), any());
+ dfcHttpClientSpy.close();
+ }
+
+ @Test
+ public void isResponseOk_validateResponse() {
+ assertTrue(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_OK));
+ assertFalse(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_ANY_NO_OK));
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
new file mode 100644
index 00000000..42ab4b3a
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
@@ -0,0 +1,170 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.cookie.Cookie;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.context.Context;
+import reactor.util.context.ContextView;
+
+import java.util.Map;
+import java.util.Set;
+
+public class HttpClientResponseHelper {
+
+ public static final HttpClientResponse RESPONSE_OK = new HttpClientResponse() {
+
+ @Override
+ public Map<CharSequence, Set<Cookie>> cookies() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeepAlive() {
+ return false;
+ }
+
+ @Override
+ public boolean isWebsocket() {
+ return false;
+ }
+
+ @Override
+ public HttpMethod method() {
+ return null;
+ }
+
+ @Override
+ public String fullPath() {
+ return null;
+ }
+
+ @Override
+ public String uri() {
+ return null;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return null;
+ }
+
+ @Override
+ public Context currentContext() {
+ return null;
+ }
+
+ @Override
+ public ContextView currentContextView() {
+ return null;
+ }
+
+ @Override
+ public String[] redirectedFrom() {
+ return new String[0];
+ }
+
+ @Override
+ public HttpHeaders requestHeaders() {
+ return null;
+ }
+
+ @Override
+ public String resourceUrl() {
+ return null;
+ }
+
+ @Override
+ public HttpHeaders responseHeaders() {
+ return null;
+ }
+
+ @Override
+ public HttpResponseStatus status() {
+ return HttpResponseStatus.OK;
+ }
+ };
+
+ public static final HttpClientResponse RESPONSE_ANY_NO_OK = new HttpClientResponse() {
+
+ @Override
+ public Map<CharSequence, Set<Cookie>> cookies() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeepAlive() {
+ return false;
+ }
+
+ @Override
+ public boolean isWebsocket() {
+ return false;
+ }
+
+ @Override
+ public HttpMethod method() {
+ return null;
+ }
+
+ @Override
+ public String fullPath() {
+ return null;
+ }
+
+ @Override
+ public String uri() {
+ return null;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return null;
+ }
+
+ @Override public Context currentContext() {
+ return null;
+ }
+
+ @Override public ContextView currentContextView() {
+ return null;
+ }
+
+ @Override public String[] redirectedFrom() {
+ return new String[0];
+ }
+
+ @Override public HttpHeaders requestHeaders() {
+ return null;
+ }
+
+ @Override public String resourceUrl() {
+ return null;
+ }
+
+ @Override public HttpHeaders responseHeaders() {
+ return null;
+ }
+
+ @Override public HttpResponseStatus status() {
+ return HttpResponseStatus.NOT_IMPLEMENTED;
+ }
+ };
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index feeeb474..a446050e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -24,9 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
public class FileDataTest {
private static final String FTPES_SCHEME = "ftpes://";
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
index b695f106..fb475791 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
@@ -17,13 +17,15 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.scheme;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
public class SchemeTest {
@@ -31,6 +33,7 @@ public class SchemeTest {
public void shouldReturnSchemeForSupportedProtocol() throws DatafileTaskException {
assertEquals(Scheme.FTPES, Scheme.getSchemeFromString("FTPES"));
assertEquals(Scheme.SFTP, Scheme.getSchemeFromString("SFTP"));
+ assertEquals(Scheme.HTTP, Scheme.getSchemeFromString("HTTP"));
}
@Test
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 9e642b7d..c7ef8dac 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -35,7 +35,7 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -217,7 +217,7 @@ class JsonMessageParserTest {
void whenPassingCorrectJsonWrongScheme_noMessage() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
- .location("http://location.xml") //
+ .location("unreal://location.xml") //
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
@@ -242,8 +242,8 @@ class JsonMessageParserTest {
assertTrue(logAppender.list.toString()
.contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
- + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "http" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE
- + ". Location: http://location.xml"),"Error missing in log");
+ + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "unreal" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE
+ + ". Location: unreal://location.xml"),"Error missing in log");
assertTrue(logAppender.list.toString().contains("sourceName=5GRAN_DU"),"Missing sourceName in log");
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index f0c8e3b3..e68913f5 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index a98e2baf..1146cf26 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -42,7 +42,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index d5a9a92c..09d7627e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -55,7 +55,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration
import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -74,7 +74,6 @@ import org.slf4j.MDC;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import reactor.test.scheduler.VirtualTimeScheduler;
public class ScheduledTasksTest {
diff --git a/pom.xml b/pom.xml
index 023f4678..8fb44432 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
<spring-boot.version>2.4.0</spring-boot.version>
<commons-io.version>1.3.2</commons-io.version>
<commons-net.version>3.3</commons-net.version>
+ <projectreactor.version>2020.0.2</projectreactor.version>
<!-- LOGGING SETTINGS -->
<slf4j.version>1.7.25</slf4j.version>
@@ -71,7 +72,7 @@
<!-- Plugin versions -->
<maven-resources-plugin.version>3.1.0</maven-resources-plugin.version>
<maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version>
- <docker-maven-plugin.version>1.1.1</docker-maven-plugin.version>
+ <docker-maven-plugin.version>1.2.1</docker-maven-plugin.version>
<git-commit-id-plugin.version>2.2.4</git-commit-id-plugin.version>
<sonar.coverage.jacoco.xmlReportPaths>
${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
@@ -229,6 +230,13 @@
<artifactId>springfox-swagger-ui</artifactId>
<version>${springfox.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-bom</artifactId>
+ <version>${projectreactor.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
</dependencies>
</dependencyManagement>