diff options
Diffstat (limited to 'datafile-app-server/src/main')
10 files changed, 185 insertions, 13 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java index d74b10a2..517e3829 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java @@ -14,7 +14,7 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import java.nio.file.Path; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java index 72623db2..32241fdb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java @@ -14,7 +14,7 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import java.util.Optional; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java index b20feb82..afa3aaea 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java @@ -17,7 +17,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.commons; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -28,10 +28,10 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; * */ public enum Scheme { - FTPES, SFTP; + FTPES, SFTP, HTTP; public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol "; - public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS and sFTP"; + public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS, sFTP and HTTP"; /** * Get a <code>Scheme</code> from a string. @@ -46,6 +46,8 @@ public enum Scheme { result = Scheme.FTPES; } else if ("SFTP".equalsIgnoreCase(schemeString)) { result = Scheme.SFTP; + } else if ("HTTP".equalsIgnoreCase(schemeString)) { + result = Scheme.HTTP; } else { throw new DatafileTaskException( DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java index a91d46ae..9bacec88 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java @@ -41,6 +41,8 @@ import javax.net.ssl.TrustManagerFactory; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index d1685203..e972aa36 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -27,6 +27,8 @@ import java.nio.file.Path; import java.util.Optional; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java new file mode 100644 index 00000000..86bfc210 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2020 Nokia. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.http; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.resources.ConnectionProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class DfcHttpClient implements FileCollectClient { + + //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS + private static final int MAX_NUMBER_OF_CONNECTIONS = 200; + private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class); + private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS); + + private final FileServerData fileServerData; + private Disposable disposableClient; + + protected HttpClient client; + + public DfcHttpClient(FileServerData fileServerData) { + this.fileServerData = fileServerData; + } + + @Override public void open() throws DatafileTaskException { + logger.trace("Setting httpClient for file download."); + + basicAuthDataPresentOrThrow(); + this.client = HttpClient.create(pool).keepAlive(true).headers( + h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password()))); + + logger.trace("httpClient, auth header was set."); + } + + private void basicAuthDataPresentOrThrow() throws DatafileTaskException { + if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) { + throw new DatafileTaskException("Not sufficient basic auth data for file."); + } + } + + @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { + logger.trace("Prepare to collectFile {}", localFile); + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Exception> errorMessage = new AtomicReference<>(); + + Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage); + Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage); + + Flux<InputStream> responseContent = getServerResponse(remoteFile); + disposableClient = responseContent.subscribe(onSuccess, onError); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new DatafileTaskException("Interrupted exception after datafile download - ", e); + } + + if (isDownloadFailed(errorMessage)) { + throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get()); + } + + logger.trace("HTTP collectFile OK"); + } + + protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) { + return (errorMessage.get() != null); + } + + @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) { + return (Throwable response) -> { + errorMessages.set(new Exception("Error in connection has occurred during file download", response)); + latch.countDown(); + }; + } + + @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch, + AtomicReference<Exception> errorMessages) { + return (InputStream response) -> { + logger.trace("Starting to process response."); + try { + long numBytes = Files.copy(response, localFile); + logger.trace("Transmission was successful - {} bytes downloaded.", numBytes); + logger.trace("CollectFile fetched: {}", localFile.toString()); + response.close(); + } catch (IOException e) { + errorMessages.set(new Exception("Error fetching file with", e)); + } finally { + latch.countDown(); + } + }; + } + + protected Flux<InputStream> getServerResponse(String remoteFile) { + return client.get() + .uri(prepareUri(remoteFile)) + .response((responseReceiver, byteBufFlux) -> { + logger.trace("HTTP response status - {}", responseReceiver.status()); + if(isResponseOk(responseReceiver)){ + return byteBufFlux.aggregate().asInputStream(); + } + return Mono.error(new Throwable("Unexpected server response code - " + + responseReceiver.status().toString())); + }); + } + + protected boolean isResponseOk(HttpClientResponse httpClientResponse) { + return httpClientResponse.status().code() == 200; + } + + @NotNull protected String prepareUri(String remoteFile) { + int port = fileServerData.port().isPresent() ? fileServerData.port().get() : HttpUtils.HTTP_DEFAULT_PORT; + return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile; + } + + @Override public void close() { + logger.trace("Starting http client disposal."); + disposableClient.dispose(); + logger.trace("Http client disposed."); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 8721f61e..1c8f57da 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -27,9 +27,9 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java index 5371d485..1dca0058 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START====================================================================== * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * Modifications Copyright (C) 2020 Nokia. All rights reserved * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +21,20 @@ package org.onap.dcaegen2.collectors.datafile.service; import org.apache.http.HttpStatus; +import java.util.Base64; + public final class HttpUtils implements HttpStatus { + public static final int HTTP_DEFAULT_PORT = 80; + private HttpUtils() { } public static boolean isSuccessfulResponseCode(Integer statusCode) { return statusCode >= 200 && statusCode < 300; } + + public static String basicAuth(String username, String password) { + return "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes()); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 708865fa..42a0df16 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; -import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.commons.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 4b89f169..e76d4156 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -27,10 +27,11 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings; +import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient; import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -111,12 +112,11 @@ public class FileCollector { counters.incNoOfFailedFtpAttempts(); return Mono.just(Optional.empty()); // Give up } catch (DatafileTaskException e) { - logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - e.toString()); + logger.warn("Failed to download file: {} {}, reason: ", fileData.sourceName(), fileData.name(), e); counters.incNoOfFailedFtpAttempts(); return Mono.error(e); } catch (Exception throwable) { - logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } @@ -128,6 +128,8 @@ public class FileCollector { return createSftpClient(fileData); case FTPES: return createFtpesClient(fileData); + case HTTP: + return createHttpClient(fileData); default: throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme()); } @@ -165,4 +167,8 @@ public class FileCollector { return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert()), config.keyPasswordPath(), Paths.get(config.trustedCa()), config.trustedCaPasswordPath()); } + + protected FileCollectClient createHttpClient(FileData fileData) { + return new DfcHttpClient(fileData.fileServerData()); + } } |