summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java28
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java27
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java4
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java114
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java47
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java57
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java50
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java33
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java28
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java33
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java10
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java62
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java8
17 files changed, 109 insertions, 435 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3c606deb..a8f79ea1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -123,12 +123,11 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transformMessages(monoJsonP));
+ return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
+ : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
@@ -138,22 +137,22 @@ public class JsonMessageParser {
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
// @formatter:off
- return Flux.just(ImmutableFileReadyMessage.builder()
+ return Mono.just(ImmutableFileReadyMessage.builder()
.pnfName(messageMetaData.sourceName())
.messageMetaData(messageMetaData)
.files(allFileDataFromJson)
.build());
// @formatter:on
} else {
- return Flux.empty();
+ return Mono.empty();
}
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 338c8323..4c0dcce5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -50,27 +50,27 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
- return Flux.just(model)
- .cache(1)
+ return Mono.just(model)
+ .cache()
.flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Flux.just(model);
+ return Mono.just(model);
} else {
- logger.warn("Publish to DR unsuccessful, response code: " + response);
- return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ logger.warn("Publish to DR unsuccessful, response code: {}", response);
+ return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index f22c7bf9..37b7a559 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -98,7 +98,7 @@ public class ScheduledTasks {
.doOnNext(fileData -> taskCounter.incrementAndGet())
.flatMap(this::collectFileFromXnf)
.flatMap(this::publishToDataRouter)
- .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
.doOnNext(model -> taskCounter.decrementAndGet())
.sequential()
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -109,8 +109,8 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(Path localFile) {
- logger.info("Datafile consumed tasks." + localFile);
+ private void onSuccess(ConsumerDmaapModel model) {
+ logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
private void onError(Throwable throwable) {
@@ -138,18 +138,19 @@ public class ScheduledTasks {
return fileCollect.collectorTask
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
- logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
- deleteFile(fileData.getLocalFileName());
- alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ Path localFileName = fileData.getLocalFileName();
+ logger.error("File fetching failed: {}", localFileName);
+ deleteFile(localFileName);
+ alreadyPublishedFiles.remove(localFileName);
taskCounter.decrementAndGet();
return Mono.empty();
}
- private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
@@ -160,13 +161,13 @@ public class ScheduledTasks {
}
- private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
taskCounter.decrementAndGet();
- return Flux.empty();
+ return Mono.empty();
}
private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
@@ -179,7 +180,7 @@ public class ScheduledTasks {
final DMaaPMessageConsumerTask messageConsumerTask =
new DMaaPMessageConsumerTask(this.applicationConfiguration);
return messageConsumerTask.execute()
- .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ .onErrorResume(this::handleConsumeMessageFailure);
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
@@ -187,13 +188,12 @@ public class ScheduledTasks {
return Flux.empty();
}
- private Flux<Path> deleteFile(Path localFile) {
+ private void deleteFile(Path localFile) {
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
} catch (Exception e) {
logger.warn("Could not delete file: {}, {}", localFile, e);
}
- return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 73511d19..d2240f18 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
@@ -96,7 +96,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -107,7 +107,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -118,7 +118,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectErrorMessage("Retries exhausted: 1/1").verify();
@@ -128,7 +128,7 @@ class DataRouterPublisherTest {
}
@SafeVarargs
- final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
+ final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 10c5b167..804b46e9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest {
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private static final String FTPES_LOCATION_NO_PORT =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private FileData createFileData() {
+ private FileData createFileData(String location) {
// @formatter:off
return ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) {
// @formatter:off
return ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
@@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
@@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:off
FileData fileData = ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest {
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION);
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest {
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index ae1435ca..442b766b 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -32,4 +32,8 @@ public class DatafileTaskException extends Exception {
public DatafileTaskException(String message) {
super(message);
}
+
+ public DatafileTaskException(String message, Exception e) {
+ super(message + e);
+ }
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
deleted file mode 100644
index 29160c94..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.apache.commons.net.ftp.FTPSClient;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-
-public class FTPSClientWrapper implements IFTPSClient {
- private FTPSClient ftpsClient = new FTPSClient();
-
- @Override
- public void setNeedClientAuth(boolean isNeedClientAuth) {
- ftpsClient.setNeedClientAuth(isNeedClientAuth);
- }
-
- @Override
- public void setKeyManager(KeyManager keyManager) {
- ftpsClient.setKeyManager(keyManager);
- }
-
- @Override
- public void setTrustManager(TrustManager trustManager) {
- ftpsClient.setTrustManager(trustManager);
- }
-
- @Override
- public void connect(String hostName, int port) throws IOException {
- ftpsClient.connect(hostName, port);
- }
-
- @Override
- public boolean login(String username, String password) throws IOException {
- return ftpsClient.login(username, password);
- }
-
- @Override
- public boolean logout() throws IOException {
- return ftpsClient.logout();
- }
-
- @Override
- public int getReplyCode() {
- return ftpsClient.getReplyCode();
- }
-
- @Override
- public void disconnect() throws IOException {
- ftpsClient.disconnect();
- }
-
- @Override
- public void enterLocalPassiveMode() {
- ftpsClient.enterLocalPassiveMode();
- }
-
- @Override
- public void setFileType(int fileType) throws IOException {
- ftpsClient.setFileType(fileType);
- }
-
- @Override
- public void execPBSZ(int psbz) throws IOException {
- ftpsClient.execPBSZ(psbz);
- }
-
- @Override
- public void execPROT(String prot) throws IOException {
- ftpsClient.execPROT(prot);
- }
-
- @Override
- public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
- try {
- if (!ftpsClient.retrieveFile(remote, local)) {
- throw new DatafileTaskException("could not retrieve file");
- }
- } catch (IOException e) {
- throw new DatafileTaskException(e);
- }
- }
-
- @Override
- public void setTimeout(Integer t) {
- this.ftpsClient.setDefaultTimeout(t);
- }
-
- @Override
- public boolean isConnected() {
- return ftpsClient.isConnected();
- }
-
- @Override
- public void setBufferSize(int bufSize) {
- ftpsClient.setBufferSize(bufSize);
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index f330b673..bedae43a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -17,11 +17,13 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.nio.file.Path;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
+@FunctionalInterface
public interface FileCollectClient {
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 461b2200..c3b7990f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -27,12 +29,10 @@ import java.util.Optional;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -55,13 +55,11 @@ public class FtpsClient implements FileCollectClient {
private Path trustedCAPath;
private String trustedCAPassword;
- private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+ private FTPSClient realFtpsClient = new FTPSClient();
private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
private IKeyStore keyStore;
private ITrustManagerFactory trustManagerFactory;
- private IFile localFile = new FileWrapper();
private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
- private IOutputStream outputStream;
private boolean keyManagerSet = false;
private boolean trustManagerSet = false;
private final FileServerData fileServerData;
@@ -83,7 +81,7 @@ public class FtpsClient implements FileCollectClient {
getFileFromxNF(realFtpsClient, remoteFile, localFile);
} catch (IOException e) {
logger.trace("", e);
- throw new DatafileTaskException("Could not open connection: " + e);
+ throw new DatafileTaskException("Could not open connection: ", e);
} catch (KeyManagerException e) {
logger.trace("", e);
throw new DatafileTaskException(e);
@@ -93,7 +91,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("collectFile fetched: {}", localFile);
}
- private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
+ private void setUpKeyManager(FTPSClient ftps) throws KeyManagerException {
if (keyManagerSet) {
logger.trace("keyManager already set!");
} else {
@@ -104,7 +102,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("complete setUpKeyManager");
}
- private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
+ private void setUpTrustedCA(FTPSClient ftps) throws DatafileTaskException {
if (trustManagerSet) {
logger.trace("trustManager already set!");
} else {
@@ -130,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+ private void setUpConnection(FTPSClient ftps) throws DatafileTaskException, IOException {
if (!ftps.isConnected()) {
ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
logger.trace("after ftp connect");
@@ -155,23 +153,26 @@ public class FtpsClient implements FileCollectClient {
logger.trace("setUpConnection successfully!");
}
- private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
- throws IOException, DatafileTaskException {
+ private void getFileFromxNF(FTPSClient ftps, String remoteFileName, Path localFileName)
+ throws IOException {
logger.trace("starting to getFile");
- this.localFile.setPath(localFileName);
- this.localFile.createNewFile();
-
- OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+ File localFile = localFileName.toFile();
+ if (localFile.createNewFile()) {
+ logger.warn("Local file {} already created", localFileName);
+ }
+ OutputStream output = new FileOutputStream(localFile);
logger.trace("begin to retrieve from xNF.");
- ftps.retrieveFile(remoteFileName, output);
+ if (!ftps.retrieveFile(remoteFileName, output)) {
+ throw new IOException("Could not retrieve file");
+ }
logger.trace("end retrieve from xNF.");
output.close();
logger.debug("File {} Download Successfull from xNF", localFileName);
}
- private void closeDownConnection(IFTPSClient ftps) {
+ private void closeDownConnection(FTPSClient ftps) {
logger.trace("starting to closeDownConnection");
if (ftps != null && ftps.isConnected()) {
try {
@@ -220,7 +221,7 @@ public class FtpsClient implements FileCollectClient {
return keyStore;
}
- void setFtpsClient(IFTPSClient ftpsClient) {
+ void setFtpsClient(FTPSClient ftpsClient) {
this.realFtpsClient = ftpsClient;
}
@@ -236,14 +237,6 @@ public class FtpsClient implements FileCollectClient {
trustManagerFactory = tmf;
}
- void setFile(IFile file) {
- localFile = file;
- }
-
- void setOutputStream(IOutputStream outputStream) {
- this.outputStream = outputStream;
- }
-
void setFileSystemResource(IFileSystemResource fileSystemResource) {
this.fileSystemResource = fileSystemResource;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
deleted file mode 100644
index 3dcaa656..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-
-public interface IFTPSClient {
- public void setNeedClientAuth(boolean isNeedClientAuth);
-
- public void setKeyManager(KeyManager keyManager);
-
- public void setTrustManager(TrustManager trustManager);
-
- public void connect(String hostname, int port) throws IOException;
-
- public boolean login(String username, String password) throws IOException;
-
- public boolean logout() throws IOException;
-
- public int getReplyCode();
-
- public void setBufferSize(int bufSize);
-
- public boolean isConnected();
-
- public void disconnect() throws IOException;
-
- public void enterLocalPassiveMode();
-
- public void setFileType(int fileType) throws IOException;
-
- public void execPBSZ(int newParam) throws IOException;
-
- public void execPROT(String prot) throws IOException;
-
- public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
-
- void setTimeout(Integer t);
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
deleted file mode 100644
index 203a5985..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-
-public class FileWrapper implements IFile {
- private File file;
-
- @Override
- public void setPath(Path path) {
- file = path.toFile();
- }
-
- @Override
- public boolean createNewFile() throws IOException {
- if (file == null) {
- throw new IOException("Path to file not set.");
- }
- return file.createNewFile();
- }
-
- @Override
- public File getFile() {
- return file;
- }
-
- @Override
- public boolean delete() {
- return file.delete();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
deleted file mode 100644
index 2b95842f..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface IFile {
- public void setPath(Path path);
-
- public boolean createNewFile() throws IOException;
-
- public File getFile();
-
- public boolean delete();
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
deleted file mode 100644
index 8015ea76..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.OutputStream;
-
-@FunctionalInterface
-public interface IOutputStream {
- public OutputStream getOutputStream(File file) throws FileNotFoundException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
deleted file mode 100644
index 88787826..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-
-public class OutputStreamWrapper implements IOutputStream {
-
- @Override
- public OutputStream getOutputStream(File file) throws FileNotFoundException {
- return new FileOutputStream(file);
- }
-
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index bced3d85..4869e4c2 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -53,7 +53,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -101,7 +101,7 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
@@ -116,12 +116,12 @@ public class DmaapProducerReactiveHttpClient {
Future<HttpResponse> future = webClient.execute(put, null);
HttpResponse response = future.get();
- logger.trace(response.toString());
+ logger.trace("{}", response);
webClient.close();
- return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
- return Flux.error(e);
+ return Mono.error(e);
}
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index c4577262..670b1bdc 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -39,12 +38,11 @@ import javax.net.ssl.TrustManager;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.io.IFile;
+import org.mockito.ArgumentMatchers;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
@@ -64,36 +62,31 @@ public class FtpsClientTest {
private static final String USERNAME = "bob";
private static final String PASSWORD = "123";
- private IFTPSClient ftpsClientMock = mock(IFTPSClient.class);
+ private FTPSClient ftpsClientMock = mock(FTPSClient.class);
private IKeyManagerUtils keyManagerUtilsMock = mock(IKeyManagerUtils.class);
private KeyManager keyManagerMock = mock(KeyManager.class);
private IKeyStore keyStoreWrapperMock = mock(IKeyStore.class);
private KeyStore keyStoreMock = mock(KeyStore.class);
private ITrustManagerFactory trustManagerFactoryMock = mock(ITrustManagerFactory.class);
private TrustManager trustManagerMock = mock(TrustManager.class);
- private IFile localFileMock = mock(IFile.class);
private IFileSystemResource fileResourceMock = mock(IFileSystemResource.class);
- private IOutputStream outputStreamMock = mock(IOutputStream.class);
private InputStream inputStreamMock = mock(InputStream.class);
FtpsClient clientUnderTest = new FtpsClient(createFileServerData());
private ImmutableFileServerData createFileServerData() {
- return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD)
+ .port(PORT).build();
}
-
@BeforeEach
protected void setUp() throws Exception {
clientUnderTest.setFtpsClient(ftpsClientMock);
clientUnderTest.setKeyManagerUtils(keyManagerUtilsMock);
clientUnderTest.setKeyStore(keyStoreWrapperMock);
clientUnderTest.setTrustManagerFactory(trustManagerFactoryMock);
- clientUnderTest.setFile(localFileMock);
clientUnderTest.setFileSystemResource(fileResourceMock);
- clientUnderTest.setOutputStream(outputStreamMock);
clientUnderTest.setKeyCertPath(FTP_KEY_PATH);
clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
@@ -109,11 +102,10 @@ public class FtpsClientTest {
when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
- File fileMock = mock(File.class);
- when(localFileMock.getFile()).thenReturn(fileMock);
- OutputStream osMock = mock(OutputStream.class);
- when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
+
when(ftpsClientMock.isConnected()).thenReturn(false, true);
+ when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class))).thenReturn(true);
clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
@@ -133,10 +125,8 @@ public class FtpsClientTest {
verify(ftpsClientMock).execPROT("P");
verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
verify(ftpsClientMock).setBufferSize(1024 * 1024);
- verify(localFileMock).setPath(LOCAL_FILE_PATH);
- verify(localFileMock, times(1)).createNewFile();
- verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
- verify(osMock, times(1)).close();
+ verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class));
verify(ftpsClientMock, times(1)).logout();
verify(ftpsClientMock, times(1)).disconnect();
verify(ftpsClientMock, times(2)).isConnected();
@@ -149,8 +139,8 @@ public class FtpsClientTest {
.setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
when(ftpsClientMock.isConnected()).thenReturn(false);
- assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)).hasMessage(
+ "org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -167,7 +157,7 @@ public class FtpsClientTest {
doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock);
assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
+ .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
}
@Test
@@ -203,7 +193,7 @@ public class FtpsClientTest {
when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -230,7 +220,7 @@ public class FtpsClientTest {
doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT);
assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("Could not open connection: java.io.IOException");
+ .hasMessage("Could not open connection: java.io.IOException");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -254,10 +244,8 @@ public class FtpsClientTest {
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
- doThrow(new IOException()).when(localFileMock).createNewFile();
-
- assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("Could not open connection: java.io.IOException");
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, Paths.get("")))
+ .hasMessage("Could not open connection: java.io.IOException: No such file or directory");
}
@@ -269,14 +257,13 @@ public class FtpsClientTest {
when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
- File fileMock = mock(File.class);
- when(localFileMock.getFile()).thenReturn(fileMock);
- OutputStream osMock = mock(OutputStream.class);
- when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
- doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+ when(ftpsClientMock.isConnected()).thenReturn(false);
+
+ doThrow(new IOException("problemas")).when(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class));
assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("problemas");
+ .hasMessage("Could not open connection: java.io.IOException: problemas");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -294,9 +281,8 @@ public class FtpsClientTest {
verify(ftpsClientMock).execPROT("P");
verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
verify(ftpsClientMock).setBufferSize(1024 * 1024);
- verify(localFileMock).setPath(LOCAL_FILE_PATH);
- verify(localFileMock, times(1)).createNewFile();
- verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+ verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class));
verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index 7f32e8c3..102388e2 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -17,7 +17,6 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import static java.nio.charset.StandardCharsets.UTF_8;
-
import static org.apache.commons.io.IOUtils.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -52,7 +51,8 @@ public class SftpClientTest {
public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
@Test
- public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
+ public void collectFile_withOKresponse()
+ throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
SftpClient sftpClient = new SftpClient(expectedFileServerData);
@@ -67,8 +67,8 @@ public class SftpClientTest {
@Test
public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException {
- FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
- .userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build();
+ FileServerData expectedFileServerData =
+ ImmutableFileServerData.builder().serverAddress("127.0.0.1").userId("Wrong").password(PASSWORD).build();
SftpClient sftpClient = new SftpClient(expectedFileServerData);
sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);