diff options
author | Krzysztof Gajewski <krzysztof.gajewski@nokia.com> | 2020-12-15 11:19:51 +0100 |
---|---|---|
committer | Krzysztof Gajewski <krzysztof.gajewski@nokia.com> | 2020-12-30 11:51:40 +0100 |
commit | 42c23b6bfa5e55c8eb5be890de34b94e907ebe89 (patch) | |
tree | 87b3f080a6da1e3360c7ef7ceb3031793ca1f07d | |
parent | 4da3abb16bf2063a949f0bc4a48af2fac9c46ba5 (diff) |
Add HTTP as new protocol to collect files from xNFs
- HTTP basic auth included
- small code refactoring related to the task
Issue-ID: DCAEGEN2-2527
Signed-off-by: Krzysztof Gajewski <krzysztof.gajewski@nokia.com>
Change-Id: I13ec80e996861e14d2c561087c4af3b34d861030
22 files changed, 538 insertions, 26 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index 62fdf485..950a3b92 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -167,6 +167,18 @@ </dependency> </dependencies> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-bom</artifactId> + <version>${projectreactor.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + <build> <resources> <resource> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java index d74b10a2..517e3829 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java @@ -14,7 +14,7 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import java.nio.file.Path; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java index 72623db2..32241fdb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java @@ -14,7 +14,7 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import java.util.Optional; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java index b20feb82..afa3aaea 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java @@ -17,7 +17,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -28,10 +28,10 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; * */ public enum Scheme { - FTPES, SFTP; + FTPES, SFTP, HTTP; public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol "; - public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS and sFTP"; + public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS, sFTP and HTTP"; /** * Get a <code>Scheme</code> from a string. @@ -46,6 +46,8 @@ public enum Scheme { result = Scheme.FTPES; } else if ("SFTP".equalsIgnoreCase(schemeString)) { result = Scheme.SFTP; + } else if ("HTTP".equalsIgnoreCase(schemeString)) { + result = Scheme.HTTP; } else { throw new DatafileTaskException( DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java index a91d46ae..9bacec88 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java @@ -41,6 +41,8 @@ import javax.net.ssl.TrustManagerFactory; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index d1685203..e972aa36 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -27,6 +27,8 @@ import java.nio.file.Path; import java.util.Optional; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java new file mode 100644 index 00000000..86bfc210 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2020 Nokia. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.http; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.resources.ConnectionProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class DfcHttpClient implements FileCollectClient { + + //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS + private static final int MAX_NUMBER_OF_CONNECTIONS = 200; + private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class); + private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS); + + private final FileServerData fileServerData; + private Disposable disposableClient; + + protected HttpClient client; + + public DfcHttpClient(FileServerData fileServerData) { + this.fileServerData = fileServerData; + } + + @Override public void open() throws DatafileTaskException { + logger.trace("Setting httpClient for file download."); + + basicAuthDataPresentOrThrow(); + this.client = HttpClient.create(pool).keepAlive(true).headers( + h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password()))); + + logger.trace("httpClient, auth header was set."); + } + + private void basicAuthDataPresentOrThrow() throws DatafileTaskException { + if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) { + throw new DatafileTaskException("Not sufficient basic auth data for file."); + } + } + + @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { + logger.trace("Prepare to collectFile {}", localFile); + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Exception> errorMessage = new AtomicReference<>(); + + Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage); + Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage); + + Flux<InputStream> responseContent = getServerResponse(remoteFile); + disposableClient = responseContent.subscribe(onSuccess, onError); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new DatafileTaskException("Interrupted exception after datafile download - ", e); + } + + if (isDownloadFailed(errorMessage)) { + throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get()); + } + + logger.trace("HTTP collectFile OK"); + } + + protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) { + return (errorMessage.get() != null); + } + + @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) { + return (Throwable response) -> { + errorMessages.set(new Exception("Error in connection has occurred during file download", response)); + latch.countDown(); + }; + } + + @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch, + AtomicReference<Exception> errorMessages) { + return (InputStream response) -> { + logger.trace("Starting to process response."); + try { + long numBytes = Files.copy(response, localFile); + logger.trace("Transmission was successful - {} bytes downloaded.", numBytes); + logger.trace("CollectFile fetched: {}", localFile.toString()); + response.close(); + } catch (IOException e) { + errorMessages.set(new Exception("Error fetching file with", e)); + } finally { + latch.countDown(); + } + }; + } + + protected Flux<InputStream> getServerResponse(String remoteFile) { + return client.get() + .uri(prepareUri(remoteFile)) + .response((responseReceiver, byteBufFlux) -> { + logger.trace("HTTP response status - {}", responseReceiver.status()); + if(isResponseOk(responseReceiver)){ + return byteBufFlux.aggregate().asInputStream(); + } + return Mono.error(new Throwable("Unexpected server response code - " + + responseReceiver.status().toString())); + }); + } + + protected boolean isResponseOk(HttpClientResponse httpClientResponse) { + return httpClientResponse.status().code() == 200; + } + + @NotNull protected String prepareUri(String remoteFile) { + int port = fileServerData.port().isPresent() ? fileServerData.port().get() : HttpUtils.HTTP_DEFAULT_PORT; + return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile; + } + + @Override public void close() { + logger.trace("Starting http client disposal."); + disposableClient.dispose(); + logger.trace("Http client disposed."); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 8721f61e..1c8f57da 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -27,9 +27,9 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java index 5371d485..1dca0058 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START====================================================================== * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * Modifications Copyright (C) 2020 Nokia. All rights reserved * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +21,20 @@ package org.onap.dcaegen2.collectors.datafile.service; import org.apache.http.HttpStatus; +import java.util.Base64; + public final class HttpUtils implements HttpStatus { + public static final int HTTP_DEFAULT_PORT = 80; + private HttpUtils() { } public static boolean isSuccessfulResponseCode(Integer statusCode) { return statusCode >= 200 && statusCode < 300; } + + public static String basicAuth(String username, String password) { + return "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes()); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 708865fa..42a0df16 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 4b89f169..e76d4156 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -27,10 +27,11 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings; +import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient; import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -111,12 +112,11 @@ public class FileCollector { counters.incNoOfFailedFtpAttempts(); return Mono.just(Optional.empty()); // Give up } catch (DatafileTaskException e) { - logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - e.toString()); + logger.warn("Failed to download file: {} {}, reason: ", fileData.sourceName(), fileData.name(), e); counters.incNoOfFailedFtpAttempts(); return Mono.error(e); } catch (Exception throwable) { - logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } @@ -128,6 +128,8 @@ public class FileCollector { return createSftpClient(fileData); case FTPES: return createFtpesClient(fileData); + case HTTP: + return createHttpClient(fileData); default: throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme()); } @@ -165,4 +167,8 @@ public class FileCollector { return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert()), config.keyPasswordPath(), Paths.get(config.trustedCa()), config.trustedCaPasswordPath()); } + + protected FileCollectClient createHttpClient(FileData fileData) { + return new DfcHttpClient(fileData.fileServerData()); + } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java index 8e6ff947..d8daa56a 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java @@ -42,6 +42,7 @@ import org.apache.commons.net.ftp.FTPSClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; import org.springframework.http.HttpStatus; public class FtpesClientTest { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index d50bfc8d..49fd3652 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableSftpConfig; import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java new file mode 100644 index 00000000..f49cd391 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java @@ -0,0 +1,145 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2020 Nokia. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.http; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import reactor.core.publisher.Flux; +import reactor.netty.http.client.HttpClientConfig; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class DfcHttpClientTest { + + private static final String USERNAME = "bob"; + private static final String PASSWORD = "123"; + private static final String XNF_ADDRESS = "127.0.0.1"; + private static final int PORT = 80; + + @Mock + private Path pathMock; + + DfcHttpClient dfcHttpClientSpy; + + private ImmutableFileServerData createFileServerData() { + return ImmutableFileServerData.builder() + .serverAddress(XNF_ADDRESS) + .userId(USERNAME).password(PASSWORD) + .port(PORT) + .build(); + } + + @BeforeEach + public void setup() { + dfcHttpClientSpy = spy(new DfcHttpClient(createFileServerData())); + } + + @Test + public void openConnection_successAuthSetup() throws DatafileTaskException { + dfcHttpClientSpy.open(); + HttpClientConfig config = dfcHttpClientSpy.client.configuration(); + assertEquals(HttpUtils.basicAuth(USERNAME, PASSWORD), config.headers().get("Authorization")); + } + + @Test + public void openConnection_failedBasicAuthSetupThrowException() { + ImmutableFileServerData serverData = ImmutableFileServerData.builder() + .serverAddress(XNF_ADDRESS) + .userId(USERNAME).password("") + .port(PORT) + .build(); + + DfcHttpClient dfcHttpClientSpy = spy(new DfcHttpClient(serverData)); + + assertThatThrownBy(() -> dfcHttpClientSpy.open()) + .hasMessageContaining("Not sufficient basic auth data for file."); + } + + @Test + public void prepareUri_UriWithoutPort() { + ImmutableFileServerData serverData = ImmutableFileServerData.builder() + .serverAddress(XNF_ADDRESS) + .userId(USERNAME).password(PASSWORD) + .build(); + DfcHttpClient clientNoPortSpy = spy(new DfcHttpClient(serverData)); + String REMOTE_FILE = "any"; + + String retrievedUri = clientNoPortSpy.prepareUri(REMOTE_FILE); + assertTrue(retrievedUri.startsWith("http://" + XNF_ADDRESS + ":80")); + } + + @Test + public void collectFile_AllOk() throws Exception { + String REMOTE_FILE = "any"; + Flux<InputStream> fis = Flux.just(new ByteArrayInputStream("ReturnedString".getBytes())); + + dfcHttpClientSpy.open(); + + when(dfcHttpClientSpy.getServerResponse(any())).thenReturn(fis); + doReturn(false).when(dfcHttpClientSpy).isDownloadFailed(any()); + + dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock); + dfcHttpClientSpy.close(); + + verify(dfcHttpClientSpy, times(1)).getServerResponse(ArgumentMatchers.eq(REMOTE_FILE)); + verify(dfcHttpClientSpy, times(1)).processDataFromServer(any(), any(), any()); + verify(dfcHttpClientSpy, times(1)).isDownloadFailed(any()); + } + + @Test + public void collectFile_No200ResponseWriteToErrorMessage() throws DatafileTaskException { + String ERROR_RESPONSE = "This is unexpected message"; + String REMOTE_FILE = "any"; + Flux<Throwable> fis = Flux.error(new Throwable(ERROR_RESPONSE)); + + dfcHttpClientSpy.open(); + + doReturn(fis).when(dfcHttpClientSpy).getServerResponse(any()); + + assertThatThrownBy(() -> dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock)) + .hasMessageContaining("Error occured during datafile download: "); + verify(dfcHttpClientSpy, times(1)).getServerResponse(REMOTE_FILE); + verify(dfcHttpClientSpy, times(1)).processFailedConnectionWithServer(any(), any()); + dfcHttpClientSpy.close(); + } + + @Test + public void isResponseOk_validateResponse() { + assertTrue(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_OK)); + assertFalse(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_ANY_NO_OK)); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java new file mode 100644 index 00000000..42ab4b3a --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java @@ -0,0 +1,170 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2020 Nokia. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.http; + +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.cookie.Cookie; +import reactor.netty.http.client.HttpClientResponse; +import reactor.util.context.Context; +import reactor.util.context.ContextView; + +import java.util.Map; +import java.util.Set; + +public class HttpClientResponseHelper { + + public static final HttpClientResponse RESPONSE_OK = new HttpClientResponse() { + + @Override + public Map<CharSequence, Set<Cookie>> cookies() { + return null; + } + + @Override + public boolean isKeepAlive() { + return false; + } + + @Override + public boolean isWebsocket() { + return false; + } + + @Override + public HttpMethod method() { + return null; + } + + @Override + public String fullPath() { + return null; + } + + @Override + public String uri() { + return null; + } + + @Override + public HttpVersion version() { + return null; + } + + @Override + public Context currentContext() { + return null; + } + + @Override + public ContextView currentContextView() { + return null; + } + + @Override + public String[] redirectedFrom() { + return new String[0]; + } + + @Override + public HttpHeaders requestHeaders() { + return null; + } + + @Override + public String resourceUrl() { + return null; + } + + @Override + public HttpHeaders responseHeaders() { + return null; + } + + @Override + public HttpResponseStatus status() { + return HttpResponseStatus.OK; + } + }; + + public static final HttpClientResponse RESPONSE_ANY_NO_OK = new HttpClientResponse() { + + @Override + public Map<CharSequence, Set<Cookie>> cookies() { + return null; + } + + @Override + public boolean isKeepAlive() { + return false; + } + + @Override + public boolean isWebsocket() { + return false; + } + + @Override + public HttpMethod method() { + return null; + } + + @Override + public String fullPath() { + return null; + } + + @Override + public String uri() { + return null; + } + + @Override + public HttpVersion version() { + return null; + } + + @Override public Context currentContext() { + return null; + } + + @Override public ContextView currentContextView() { + return null; + } + + @Override public String[] redirectedFrom() { + return new String[0]; + } + + @Override public HttpHeaders requestHeaders() { + return null; + } + + @Override public String resourceUrl() { + return null; + } + + @Override public HttpHeaders responseHeaders() { + return null; + } + + @Override public HttpResponseStatus status() { + return HttpResponseStatus.NOT_IMPLEMENTED; + } + }; +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java index feeeb474..a446050e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -24,9 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; public class FileDataTest { private static final String FTPES_SCHEME = "ftpes://"; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java index b695f106..fb475791 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java @@ -17,13 +17,15 @@ * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.scheme; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; public class SchemeTest { @@ -31,6 +33,7 @@ public class SchemeTest { public void shouldReturnSchemeForSupportedProtocol() throws DatafileTaskException { assertEquals(Scheme.FTPES, Scheme.getSchemeFromString("FTPES")); assertEquals(Scheme.SFTP, Scheme.getSchemeFromString("SFTP")); + assertEquals(Scheme.HTTP, Scheme.getSchemeFromString("HTTP")); } @Test diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 9e642b7d..c7ef8dac 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -35,7 +35,7 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -217,7 +217,7 @@ class JsonMessageParserTest { void whenPassingCorrectJsonWrongScheme_noMessage() { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // - .location("http://location.xml") // + .location("unreal://location.xml") // .compression(GZIP_COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // @@ -242,8 +242,8 @@ class JsonMessageParserTest { assertTrue(logAppender.list.toString() .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING - + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "http" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE - + ". Location: http://location.xml"),"Error missing in log"); + + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "unreal" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE + + ". Location: unreal://location.xml"),"Error missing in log"); assertTrue(logAppender.list.toString().contains("sourceName=5GRAN_DU"),"Missing sourceName in log"); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index f0c8e3b3..e68913f5 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index a98e2baf..1146cf26 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -42,7 +42,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index d5a9a92c..09d7627e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -55,7 +55,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -74,7 +74,6 @@ import org.slf4j.MDC; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import reactor.test.scheduler.VirtualTimeScheduler; public class ScheduledTasksTest { @@ -53,6 +53,7 @@ <spring-boot.version>2.4.0</spring-boot.version> <commons-io.version>1.3.2</commons-io.version> <commons-net.version>3.3</commons-net.version> + <projectreactor.version>2020.0.2</projectreactor.version> <!-- LOGGING SETTINGS --> <slf4j.version>1.7.25</slf4j.version> @@ -71,7 +72,7 @@ <!-- Plugin versions --> <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> <maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version> - <docker-maven-plugin.version>1.1.1</docker-maven-plugin.version> + <docker-maven-plugin.version>1.2.1</docker-maven-plugin.version> <git-commit-id-plugin.version>2.2.4</git-commit-id-plugin.version> <sonar.coverage.jacoco.xmlReportPaths> ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml @@ -229,6 +230,13 @@ <artifactId>springfox-swagger-ui</artifactId> <version>${springfox.version}</version> </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-bom</artifactId> + <version>${projectreactor.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> </dependencies> </dependencyManagement> |