aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java151
1 files changed, 151 insertions, 0 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
new file mode 100644
index 00000000..86bfc210
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.resources.ConnectionProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class DfcHttpClient implements FileCollectClient {
+
+ //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+ private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
+ private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
+ private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
+
+ private final FileServerData fileServerData;
+ private Disposable disposableClient;
+
+ protected HttpClient client;
+
+ public DfcHttpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override public void open() throws DatafileTaskException {
+ logger.trace("Setting httpClient for file download.");
+
+ basicAuthDataPresentOrThrow();
+ this.client = HttpClient.create(pool).keepAlive(true).headers(
+ h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password())));
+
+ logger.trace("httpClient, auth header was set.");
+ }
+
+ private void basicAuthDataPresentOrThrow() throws DatafileTaskException {
+ if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) {
+ throw new DatafileTaskException("Not sufficient basic auth data for file.");
+ }
+ }
+
+ @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("Prepare to collectFile {}", localFile);
+ CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> errorMessage = new AtomicReference<>();
+
+ Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
+ Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
+
+ Flux<InputStream> responseContent = getServerResponse(remoteFile);
+ disposableClient = responseContent.subscribe(onSuccess, onError);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
+ }
+
+ if (isDownloadFailed(errorMessage)) {
+ throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get());
+ }
+
+ logger.trace("HTTP collectFile OK");
+ }
+
+ protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
+ return (errorMessage.get() != null);
+ }
+
+ @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) {
+ return (Throwable response) -> {
+ errorMessages.set(new Exception("Error in connection has occurred during file download", response));
+ latch.countDown();
+ };
+ }
+
+ @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
+ AtomicReference<Exception> errorMessages) {
+ return (InputStream response) -> {
+ logger.trace("Starting to process response.");
+ try {
+ long numBytes = Files.copy(response, localFile);
+ logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
+ logger.trace("CollectFile fetched: {}", localFile.toString());
+ response.close();
+ } catch (IOException e) {
+ errorMessages.set(new Exception("Error fetching file with", e));
+ } finally {
+ latch.countDown();
+ }
+ };
+ }
+
+ protected Flux<InputStream> getServerResponse(String remoteFile) {
+ return client.get()
+ .uri(prepareUri(remoteFile))
+ .response((responseReceiver, byteBufFlux) -> {
+ logger.trace("HTTP response status - {}", responseReceiver.status());
+ if(isResponseOk(responseReceiver)){
+ return byteBufFlux.aggregate().asInputStream();
+ }
+ return Mono.error(new Throwable("Unexpected server response code - "
+ + responseReceiver.status().toString()));
+ });
+ }
+
+ protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
+ return httpClientResponse.status().code() == 200;
+ }
+
+ @NotNull protected String prepareUri(String remoteFile) {
+ int port = fileServerData.port().isPresent() ? fileServerData.port().get() : HttpUtils.HTTP_DEFAULT_PORT;
+ return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile;
+ }
+
+ @Override public void close() {
+ logger.trace("Starting http client disposal.");
+ disposableClient.dispose();
+ logger.trace("Http client disposed.");
+ }
+}