/*- * ============LICENSE_START====================================================================== * Copyright (C) 2020-2021 Nokia. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.http; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.commons.FileServerData; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientResponse; import reactor.netty.resources.ConnectionProvider; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** * Gets file from PNF with HTTP protocol. * * @author Krzysztof Gajewski */ public class DfcHttpClient implements FileCollectClient { //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS private static final int MAX_NUMBER_OF_CONNECTIONS = 200; private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class); private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS); private final FileServerData fileServerData; private Disposable disposableClient; protected HttpClient client; public DfcHttpClient(FileServerData fileServerData) { this.fileServerData = fileServerData; } @Override public void open() throws DatafileTaskException { logger.trace("Setting httpClient for file download."); basicAuthDataPresentOrThrow(); this.client = HttpClient.create(pool).keepAlive(true).headers( h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password()))); logger.trace("httpClient, auth header was set."); } private void basicAuthDataPresentOrThrow() throws DatafileTaskException { if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) { throw new DatafileTaskException("Not sufficient basic auth data for file."); } } @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { logger.trace("Prepare to collectFile {}", localFile); CountDownLatch latch = new CountDownLatch(1); final AtomicReference errorMessage = new AtomicReference<>(); Consumer onError = processFailedConnectionWithServer(latch, errorMessage); Consumer onSuccess = processDataFromServer(localFile, latch, errorMessage); Flux responseContent = getServerResponse(remoteFile); disposableClient = responseContent.subscribe(onSuccess, onError); try { latch.await(); } catch (InterruptedException e) { throw new DatafileTaskException("Interrupted exception after datafile download - ", e); } if (isDownloadFailed(errorMessage)) { throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get()); } logger.trace("HTTP collectFile OK"); } protected boolean isDownloadFailed(AtomicReference errorMessage) { return (errorMessage.get() != null); } @NotNull protected Consumer processFailedConnectionWithServer(CountDownLatch latch, AtomicReference errorMessages) { return (Throwable response) -> { errorMessages.set(new Exception("Error in connection has occurred during file download", response)); latch.countDown(); }; } @NotNull protected Consumer processDataFromServer(Path localFile, CountDownLatch latch, AtomicReference errorMessages) { return (InputStream response) -> { logger.trace("Starting to process response."); try { long numBytes = Files.copy(response, localFile); logger.trace("Transmission was successful - {} bytes downloaded.", numBytes); logger.trace("CollectFile fetched: {}", localFile); response.close(); } catch (IOException e) { errorMessages.set(new Exception("Error fetching file with", e)); } finally { latch.countDown(); } }; } protected Flux getServerResponse(String remoteFile) { return client.get() .uri(prepareUri(remoteFile)) .response((responseReceiver, byteBufFlux) -> { logger.trace("HTTP response status - {}", responseReceiver.status()); if(isResponseOk(responseReceiver)){ return byteBufFlux.aggregate().asInputStream(); } return Mono.error(new Throwable("Unexpected server response code - " + responseReceiver.status().toString())); }); } protected boolean isResponseOk(HttpClientResponse httpClientResponse) { return httpClientResponse.status().code() == 200; } @NotNull protected String prepareUri(String remoteFile) { int port = fileServerData.port().orElse(HttpUtils.HTTP_DEFAULT_PORT); return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile; } @Override public void close() { logger.trace("Starting http client disposal."); disposableClient.dispose(); logger.trace("Http client disposed."); } }