diff options
17 files changed, 109 insertions, 435 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3c606deb..a8f79ea1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -123,12 +123,11 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transformMessages(monoJsonP)); + return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Flux<FileReadyMessage> transformMessages(JsonObject message) { + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); @@ -138,22 +137,22 @@ public class JsonMessageParser { if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); // @formatter:off - return Flux.just(ImmutableFileReadyMessage.builder() + return Mono.just(ImmutableFileReadyMessage.builder() .pnfName(messageMetaData.sourceName()) .messageMetaData(messageMetaData) .files(allFileDataFromJson) .build()); // @formatter:on } else { - return Flux.empty(); + return Mono.empty(); } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); + return Mono.empty(); } logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); + return Mono.empty(); } private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 338c8323..4c0dcce5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -50,27 +50,27 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off - return Flux.just(model) - .cache(1) + return Mono.just(model) + .cache() .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Flux.just(model); + return Mono.just(model); } else { - logger.warn("Publish to DR unsuccessful, response code: " + response); - return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + logger.warn("Publish to DR unsuccessful, response code: {}", response); + return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index f22c7bf9..37b7a559 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -98,7 +98,7 @@ public class ScheduledTasks { .doOnNext(fileData -> taskCounter.incrementAndGet()) .flatMap(this::collectFileFromXnf) .flatMap(this::publishToDataRouter) - .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) .doOnNext(model -> taskCounter.decrementAndGet()) .sequential() .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -109,8 +109,8 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(Path localFile) { - logger.info("Datafile consumed tasks." + localFile); + private void onSuccess(ConsumerDmaapModel model) { + logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } private void onError(Throwable throwable) { @@ -138,18 +138,19 @@ public class ScheduledTasks { return fileCollect.collectorTask .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { - logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); - deleteFile(fileData.getLocalFileName()); - alreadyPublishedFiles.remove(fileData.getLocalFileName()); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) { + Path localFileName = fileData.getLocalFileName(); + logger.error("File fetching failed: {}", localFileName); + deleteFile(localFileName); + alreadyPublishedFiles.remove(localFileName); taskCounter.decrementAndGet(); return Mono.empty(); } - private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); @@ -160,13 +161,13 @@ public class ScheduledTasks { } - private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); taskCounter.decrementAndGet(); - return Flux.empty(); + return Mono.empty(); } private Flux<FileReadyMessage> consumeMessagesFromDmaap() { @@ -179,7 +180,7 @@ public class ScheduledTasks { final DMaaPMessageConsumerTask messageConsumerTask = new DMaaPMessageConsumerTask(this.applicationConfiguration); return messageConsumerTask.execute() - .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + .onErrorResume(this::handleConsumeMessageFailure); } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { @@ -187,13 +188,12 @@ public class ScheduledTasks { return Flux.empty(); } - private Flux<Path> deleteFile(Path localFile) { + private void deleteFile(Path localFile) { logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); } catch (Exception e) { logger.warn("Could not delete file: {}, {}", localFile, e); } - return Flux.just(localFile); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 73511d19..d2240f18 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** @@ -96,7 +96,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -107,7 +107,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenSucceeds() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -118,7 +118,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenFails() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectErrorMessage("Retries exhausted: 1/1").verify(); @@ -128,7 +128,7 @@ class DataRouterPublisherTest { } @SafeVarargs - final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { + final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, nextHttpResponses); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 10c5b167..804b46e9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest { private static final String PWD = "pwd"; private static final String FTPES_LOCATION = FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private static final String FTPES_LOCATION_NO_PORT = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION; + private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; @@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private FileData createFileData() { + private FileData createFileData(String location) { // @formatter:off return ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) { // @formatter:off return ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) @@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); @@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest { // @formatter:off FileData fileData = ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest { public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION); doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest { doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index ae1435ca..442b766b 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -32,4 +32,8 @@ public class DatafileTaskException extends Exception { public DatafileTaskException(String message) { super(message); } + + public DatafileTaskException(String message, Exception e) { + super(message + e); + } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java deleted file mode 100644 index 29160c94..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import java.io.IOException; -import java.io.OutputStream; -import javax.net.ssl.KeyManager; -import javax.net.ssl.TrustManager; -import org.apache.commons.net.ftp.FTPSClient; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; - -public class FTPSClientWrapper implements IFTPSClient { - private FTPSClient ftpsClient = new FTPSClient(); - - @Override - public void setNeedClientAuth(boolean isNeedClientAuth) { - ftpsClient.setNeedClientAuth(isNeedClientAuth); - } - - @Override - public void setKeyManager(KeyManager keyManager) { - ftpsClient.setKeyManager(keyManager); - } - - @Override - public void setTrustManager(TrustManager trustManager) { - ftpsClient.setTrustManager(trustManager); - } - - @Override - public void connect(String hostName, int port) throws IOException { - ftpsClient.connect(hostName, port); - } - - @Override - public boolean login(String username, String password) throws IOException { - return ftpsClient.login(username, password); - } - - @Override - public boolean logout() throws IOException { - return ftpsClient.logout(); - } - - @Override - public int getReplyCode() { - return ftpsClient.getReplyCode(); - } - - @Override - public void disconnect() throws IOException { - ftpsClient.disconnect(); - } - - @Override - public void enterLocalPassiveMode() { - ftpsClient.enterLocalPassiveMode(); - } - - @Override - public void setFileType(int fileType) throws IOException { - ftpsClient.setFileType(fileType); - } - - @Override - public void execPBSZ(int psbz) throws IOException { - ftpsClient.execPBSZ(psbz); - } - - @Override - public void execPROT(String prot) throws IOException { - ftpsClient.execPROT(prot); - } - - @Override - public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException { - try { - if (!ftpsClient.retrieveFile(remote, local)) { - throw new DatafileTaskException("could not retrieve file"); - } - } catch (IOException e) { - throw new DatafileTaskException(e); - } - } - - @Override - public void setTimeout(Integer t) { - this.ftpsClient.setDefaultTimeout(t); - } - - @Override - public boolean isConnected() { - return ftpsClient.isConnected(); - } - - @Override - public void setBufferSize(int bufSize) { - ftpsClient.setBufferSize(bufSize); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java index f330b673..bedae43a 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java @@ -17,11 +17,13 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.nio.file.Path; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ +@FunctionalInterface public interface FileCollectClient { public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 461b2200..c3b7990f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -16,6 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.ftp; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -27,12 +29,10 @@ import java.util.Optional; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; +import org.apache.commons.net.ftp.FTPSClient; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; -import org.onap.dcaegen2.collectors.datafile.io.FileWrapper; -import org.onap.dcaegen2.collectors.datafile.io.IFile; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.io.IOutputStream; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore; @@ -55,13 +55,11 @@ public class FtpsClient implements FileCollectClient { private Path trustedCAPath; private String trustedCAPassword; - private IFTPSClient realFtpsClient = new FTPSClientWrapper(); + private FTPSClient realFtpsClient = new FTPSClient(); private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper(); private IKeyStore keyStore; private ITrustManagerFactory trustManagerFactory; - private IFile localFile = new FileWrapper(); private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper(); - private IOutputStream outputStream; private boolean keyManagerSet = false; private boolean trustManagerSet = false; private final FileServerData fileServerData; @@ -83,7 +81,7 @@ public class FtpsClient implements FileCollectClient { getFileFromxNF(realFtpsClient, remoteFile, localFile); } catch (IOException e) { logger.trace("", e); - throw new DatafileTaskException("Could not open connection: " + e); + throw new DatafileTaskException("Could not open connection: ", e); } catch (KeyManagerException e) { logger.trace("", e); throw new DatafileTaskException(e); @@ -93,7 +91,7 @@ public class FtpsClient implements FileCollectClient { logger.trace("collectFile fetched: {}", localFile); } - private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException { + private void setUpKeyManager(FTPSClient ftps) throws KeyManagerException { if (keyManagerSet) { logger.trace("keyManager already set!"); } else { @@ -104,7 +102,7 @@ public class FtpsClient implements FileCollectClient { logger.trace("complete setUpKeyManager"); } - private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException { + private void setUpTrustedCA(FTPSClient ftps) throws DatafileTaskException { if (trustManagerSet) { logger.trace("trustManager already set!"); } else { @@ -130,7 +128,7 @@ public class FtpsClient implements FileCollectClient { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } - private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException { + private void setUpConnection(FTPSClient ftps) throws DatafileTaskException, IOException { if (!ftps.isConnected()) { ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port())); logger.trace("after ftp connect"); @@ -155,23 +153,26 @@ public class FtpsClient implements FileCollectClient { logger.trace("setUpConnection successfully!"); } - private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName) - throws IOException, DatafileTaskException { + private void getFileFromxNF(FTPSClient ftps, String remoteFileName, Path localFileName) + throws IOException { logger.trace("starting to getFile"); - this.localFile.setPath(localFileName); - this.localFile.createNewFile(); - - OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile()); + File localFile = localFileName.toFile(); + if (localFile.createNewFile()) { + logger.warn("Local file {} already created", localFileName); + } + OutputStream output = new FileOutputStream(localFile); logger.trace("begin to retrieve from xNF."); - ftps.retrieveFile(remoteFileName, output); + if (!ftps.retrieveFile(remoteFileName, output)) { + throw new IOException("Could not retrieve file"); + } logger.trace("end retrieve from xNF."); output.close(); logger.debug("File {} Download Successfull from xNF", localFileName); } - private void closeDownConnection(IFTPSClient ftps) { + private void closeDownConnection(FTPSClient ftps) { logger.trace("starting to closeDownConnection"); if (ftps != null && ftps.isConnected()) { try { @@ -220,7 +221,7 @@ public class FtpsClient implements FileCollectClient { return keyStore; } - void setFtpsClient(IFTPSClient ftpsClient) { + void setFtpsClient(FTPSClient ftpsClient) { this.realFtpsClient = ftpsClient; } @@ -236,14 +237,6 @@ public class FtpsClient implements FileCollectClient { trustManagerFactory = tmf; } - void setFile(IFile file) { - localFile = file; - } - - void setOutputStream(IOutputStream outputStream) { - this.outputStream = outputStream; - } - void setFileSystemResource(IFileSystemResource fileSystemResource) { this.fileSystemResource = fileSystemResource; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java deleted file mode 100644 index 3dcaa656..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import java.io.IOException; -import java.io.OutputStream; -import javax.net.ssl.KeyManager; -import javax.net.ssl.TrustManager; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; - -public interface IFTPSClient { - public void setNeedClientAuth(boolean isNeedClientAuth); - - public void setKeyManager(KeyManager keyManager); - - public void setTrustManager(TrustManager trustManager); - - public void connect(String hostname, int port) throws IOException; - - public boolean login(String username, String password) throws IOException; - - public boolean logout() throws IOException; - - public int getReplyCode(); - - public void setBufferSize(int bufSize); - - public boolean isConnected(); - - public void disconnect() throws IOException; - - public void enterLocalPassiveMode(); - - public void setFileType(int fileType) throws IOException; - - public void execPBSZ(int newParam) throws IOException; - - public void execPROT(String prot) throws IOException; - - public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException; - - void setTimeout(Integer t); -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java deleted file mode 100644 index 203a5985..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; - -public class FileWrapper implements IFile { - private File file; - - @Override - public void setPath(Path path) { - file = path.toFile(); - } - - @Override - public boolean createNewFile() throws IOException { - if (file == null) { - throw new IOException("Path to file not set."); - } - return file.createNewFile(); - } - - @Override - public File getFile() { - return file; - } - - @Override - public boolean delete() { - return file.delete(); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java deleted file mode 100644 index 2b95842f..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; - -public interface IFile { - public void setPath(Path path); - - public boolean createNewFile() throws IOException; - - public File getFile(); - - public boolean delete(); -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java deleted file mode 100644 index 8015ea76..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.OutputStream; - -@FunctionalInterface -public interface IOutputStream { - public OutputStream getOutputStream(File file) throws FileNotFoundException; -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java deleted file mode 100644 index 88787826..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.OutputStream; - -public class OutputStreamWrapper implements IOutputStream { - - @Override - public OutputStream getOutputStream(File file) throws FileNotFoundException { - return new FileOutputStream(file); - } - -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index bced3d85..4869e4c2 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -53,7 +53,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -101,7 +101,7 @@ public class DmaapProducerReactiveHttpClient { * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter * @return status code of operation */ - public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { + public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); try { logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation()); @@ -116,12 +116,12 @@ public class DmaapProducerReactiveHttpClient { Future<HttpResponse> future = webClient.execute(put, null); HttpResponse response = future.get(); - logger.trace(response.toString()); + logger.trace("{}", response); webClient.close(); - return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); + return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); - return Flux.error(e); + return Mono.error(e); } } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index c4577262..670b1bdc 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -39,12 +38,11 @@ import javax.net.ssl.TrustManager; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; +import org.apache.commons.net.ftp.FTPSClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.io.IFile; +import org.mockito.ArgumentMatchers; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.io.IOutputStream; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore; import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory; @@ -64,36 +62,31 @@ public class FtpsClientTest { private static final String USERNAME = "bob"; private static final String PASSWORD = "123"; - private IFTPSClient ftpsClientMock = mock(IFTPSClient.class); + private FTPSClient ftpsClientMock = mock(FTPSClient.class); private IKeyManagerUtils keyManagerUtilsMock = mock(IKeyManagerUtils.class); private KeyManager keyManagerMock = mock(KeyManager.class); private IKeyStore keyStoreWrapperMock = mock(IKeyStore.class); private KeyStore keyStoreMock = mock(KeyStore.class); private ITrustManagerFactory trustManagerFactoryMock = mock(ITrustManagerFactory.class); private TrustManager trustManagerMock = mock(TrustManager.class); - private IFile localFileMock = mock(IFile.class); private IFileSystemResource fileResourceMock = mock(IFileSystemResource.class); - private IOutputStream outputStreamMock = mock(IOutputStream.class); private InputStream inputStreamMock = mock(InputStream.class); FtpsClient clientUnderTest = new FtpsClient(createFileServerData()); private ImmutableFileServerData createFileServerData() { - return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); + return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD) + .port(PORT).build(); } - @BeforeEach protected void setUp() throws Exception { clientUnderTest.setFtpsClient(ftpsClientMock); clientUnderTest.setKeyManagerUtils(keyManagerUtilsMock); clientUnderTest.setKeyStore(keyStoreWrapperMock); clientUnderTest.setTrustManagerFactory(trustManagerFactoryMock); - clientUnderTest.setFile(localFileMock); clientUnderTest.setFileSystemResource(fileResourceMock); - clientUnderTest.setOutputStream(outputStreamMock); clientUnderTest.setKeyCertPath(FTP_KEY_PATH); clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD); @@ -109,11 +102,10 @@ public class FtpsClientTest { when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); - File fileMock = mock(File.class); - when(localFileMock.getFile()).thenReturn(fileMock); - OutputStream osMock = mock(OutputStream.class); - when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock); + when(ftpsClientMock.isConnected()).thenReturn(false, true); + when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), + ArgumentMatchers.any(OutputStream.class))).thenReturn(true); clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH); @@ -133,10 +125,8 @@ public class FtpsClientTest { verify(ftpsClientMock).execPROT("P"); verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); verify(ftpsClientMock).setBufferSize(1024 * 1024); - verify(localFileMock).setPath(LOCAL_FILE_PATH); - verify(localFileMock, times(1)).createNewFile(); - verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); - verify(osMock, times(1)).close(); + verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), + ArgumentMatchers.any(OutputStream.class)); verify(ftpsClientMock, times(1)).logout(); verify(ftpsClientMock, times(1)).disconnect(); verify(ftpsClientMock, times(2)).isConnected(); @@ -149,8 +139,8 @@ public class FtpsClientTest { .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); when(ftpsClientMock.isConnected()).thenReturn(false); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException"); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)).hasMessage( + "org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException"); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); @@ -167,7 +157,7 @@ public class FtpsClientTest { doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock); assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException"); + .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException"); } @Test @@ -203,7 +193,7 @@ public class FtpsClientTest { when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE); assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503"); + .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503"); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); @@ -230,7 +220,7 @@ public class FtpsClientTest { doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT); assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not open connection: java.io.IOException"); + .hasMessage("Could not open connection: java.io.IOException"); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); @@ -254,10 +244,8 @@ public class FtpsClientTest { when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); - doThrow(new IOException()).when(localFileMock).createNewFile(); - - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not open connection: java.io.IOException"); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, Paths.get(""))) + .hasMessage("Could not open connection: java.io.IOException: No such file or directory"); } @@ -269,14 +257,13 @@ public class FtpsClientTest { when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); - File fileMock = mock(File.class); - when(localFileMock.getFile()).thenReturn(fileMock); - OutputStream osMock = mock(OutputStream.class); - when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock); - doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); + when(ftpsClientMock.isConnected()).thenReturn(false); + + doThrow(new IOException("problemas")).when(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), + ArgumentMatchers.any(OutputStream.class)); assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("problemas"); + .hasMessage("Could not open connection: java.io.IOException: problemas"); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); @@ -294,9 +281,8 @@ public class FtpsClientTest { verify(ftpsClientMock).execPROT("P"); verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); verify(ftpsClientMock).setBufferSize(1024 * 1024); - verify(localFileMock).setPath(LOCAL_FILE_PATH); - verify(localFileMock, times(1)).createNewFile(); - verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); + verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), + ArgumentMatchers.any(OutputStream.class)); verify(ftpsClientMock, times(2)).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index 7f32e8c3..102388e2 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -17,7 +17,6 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import static java.nio.charset.StandardCharsets.UTF_8; - import static org.apache.commons.io.IOUtils.toByteArray; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -52,7 +51,8 @@ public class SftpClientTest { public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD); @Test - public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception { + public void collectFile_withOKresponse() + throws DatafileTaskException, IOException, JSchException, SftpException, Exception { FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build(); SftpClient sftpClient = new SftpClient(expectedFileServerData); @@ -67,8 +67,8 @@ public class SftpClientTest { @Test public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException { - FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") - .userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build(); + FileServerData expectedFileServerData = + ImmutableFileServerData.builder().serverAddress("127.0.0.1").userId("Wrong").password(PASSWORD).build(); SftpClient sftpClient = new SftpClient(expectedFileServerData); sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); |