aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-09-26 16:03:45 +0200
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-09-27 14:30:56 +0200
commit263d1a481bd5378e59b804425bc8a9bdc9bebb9a (patch)
tree4825c45fbccf1f99e3765417118c8a74bd9fe990 /datafile-app-server/src/test
parentd1e9b1e4004faf059c2505ef9ab561bf19472b98 (diff)
Fix delivery to DataRouter
The messages to the DataRouter was not actually sent. Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f Issue-ID: DCAEGEN2-841 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/test')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java115
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java134
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java83
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java66
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java131
5 files changed, 243 insertions, 286 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
deleted file mode 100644
index 2f61ac97..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class FileCollectorTest {
-
- private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
- private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FTPES_SCHEME = "ftpes://";
- private static final String SFTP_SCHEME = "sftp://";
- private static final String SERVER_ADDRESS = "192.168.0.101";
- private static final int PORT_22 = 22;
- private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
- private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String GZIP_COMPRESSION = "gzip";
- private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- private FtpsClient ftpsClientMock = mock(FtpsClient.class);
-
- private SftpClient sftpClientMock = mock(SftpClient.class);
-
- private FileCollector fileCollectorUndetTest = new FileCollector(ftpsClientMock, sftpClientMock);
-
- @Test
- public void whenSingleFtpesFile_returnCorrectResponse() {
- List<FileData> listOfFileData = new ArrayList<FileData>();
- listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
- FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
- Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
- List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
- assertEquals(1, consumerModels.size());
- ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
- assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
- assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
- assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
- assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
- FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
- verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verifyNoMoreInteractions(ftpsClientMock);
- }
-
- @Test
- public void whenSingleSftpFile_returnCorrectResponse() {
- List<FileData> listOfFileData = new ArrayList<FileData>();
- listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
- FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
- Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
- List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
- assertEquals(1, consumerModels.size());
- ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
- assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
- assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
- assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
- assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
- FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
- verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verifyNoMoreInteractions(ftpsClientMock);
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index 8c36a51f..b5457b82 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -16,11 +16,11 @@
package org.onap.dcaegen2.collectors.datafile.service;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
-import java.util.List;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
import java.util.Optional;
import org.junit.jupiter.api.Test;
@@ -31,9 +31,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -42,19 +39,27 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapConsumerJsonParserTest {
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String CHANGE_TYPE = "FileReady";
+ private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
+ FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -63,19 +68,18 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(expectedFileData, fileDataResult.get(0));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutName_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField =
+ new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -84,18 +88,18 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutLocation_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField =
+ new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -104,19 +108,17 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutCompression_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -125,19 +127,17 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -146,24 +146,24 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
- AdditionalField additionalFaultyField =
- new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField).build();
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
+ .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
+
+ FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -172,10 +172,8 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(1, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
}
@Test
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
index e6818453..43502b49 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
@@ -35,16 +34,16 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfig
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -53,7 +52,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapConsumerTaskImplTest {
- private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+ private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
private static final String FTPES_SCHEME = "ftpes://";
private static final String SFTP_SCHEME = "sftp://";
@@ -75,13 +74,11 @@ class DmaapConsumerTaskImplTest {
private DmaapConsumerTaskImpl dmaapConsumerTask;
private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
- private static FileCollector fileCollectorMock;
-
private static String ftpesMessage;
- private static List<FileData> ftpesFileDataAfterConsume = new ArrayList<FileData>();
+ private static FileData ftpesFileData;
private static String sftpMessage;
- private static List<FileData> sftpFileDataAfterConsume = new ArrayList<FileData>();
+ private static FileData sftpFileData;
@BeforeAll
public static void setUp() {
@@ -95,42 +92,38 @@ class DmaapConsumerTaskImplTest {
AdditionalField ftpesAdditionalField =
new JsonMessage.AdditionalFieldBuilder().location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
.addAdditionalField(ftpesAdditionalField).build();
ftpesMessage = ftpesJsonMessage.toString();
- FileData ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- ftpesFileDataAfterConsume.add(ftpesFileData);
+ ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
AdditionalField sftpAdditionalField =
new JsonMessage.AdditionalFieldBuilder().location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
.addAdditionalField(sftpAdditionalField).build();
sftpMessage = sftpJsonMessage.toString();
- FileData sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- sftpFileDataAfterConsume.add(sftpFileData);
+ sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
- ImmutableConsumerDmaapModel consumerDmaapModel =
- ImmutableConsumerDmaapModel.builder().location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
-
- fileCollectorMock = mock(FileCollector.class);
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
- // given
- prepareMocksForDmaapConsumer("", new ArrayList<FileData>());
+ prepareMocksForDmaapConsumer("", null);
- // then
StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
.expectError(DmaapEmptyResponseException.class).verify();
@@ -139,51 +132,39 @@ class DmaapConsumerTaskImplTest {
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- // given
- prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileDataAfterConsume);
- // when
- final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
- // then
+ prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+
+ StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
- verify(fileCollectorMock, times(1)).getFilesFromSender(ftpesFileDataAfterConsume);
- verifyNoMoreInteractions(fileCollectorMock);
- Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- // given
- prepareMocksForDmaapConsumer(sftpMessage, sftpFileDataAfterConsume);
- // when
- final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
- // then
+ prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+
+ StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
- verify(fileCollectorMock, times(1)).getFilesFromSender(sftpFileDataAfterConsume);
- verifyNoMoreInteractions(fileCollectorMock);
- Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
}
- private void prepareMocksForDmaapConsumer(String message, List<FileData> fileDataAfterConsume) {
+ private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Mono.just(fileDataAfterConsume));
+ when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
} else {
when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
- .thenReturn(Mono.error(new DmaapEmptyResponseException()));
+ .thenReturn(Flux.error(new DmaapEmptyResponseException()));
}
- when(fileCollectorMock.getFilesFromSender(fileDataAfterConsume))
- .thenReturn(Mono.just(listOfConsumerDmaapModel));
- dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient,
- dmaapConsumerJsonParserMock, fileCollectorMock));
+ dmaapConsumerTask =
+ spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
index 4f7787e9..c124e982 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
@@ -2,17 +2,15 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
@@ -27,32 +25,27 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.function.Executable;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapPublisherTaskImplTest {
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static ConsumerDmaapModel consumerDmaapModel;
- private static List<ConsumerDmaapModel> listOfConsumerDmaapModel;
private static DmaapPublisherTaskImpl dmaapPublisherTask;
private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
private static AppConfig appConfig;
@@ -64,48 +57,17 @@ class DmaapPublisherTaskImplTest {
new ImmutableDmaapPublisherConfiguration.Builder().dmaapContentType("application/json")
.dmaapHostName("54.45.33.2").dmaapPortNumber(1234).dmaapProtocol("https").dmaapUserName("DFC")
.dmaapUserPassword("DFC").dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT").build();
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location("target/A20161224.1030-1045.bin.gz")
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME).location("target/" + PM_FILE_NAME)
.compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
- listOfConsumerDmaapModel.add(consumerDmaapModel);
appConfig = mock(AppConfig.class);
}
@Test
- public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
- // given
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- dmaapPublisherTask = new DmaapPublisherTaskImpl(appConfig);
-
- // when
- Executable executableFunction = () -> dmaapPublisherTask.execute(null);
-
- // then
- Assertions.assertThrows(DatafileTaskException.class, executableFunction,
- "The specified parameter is incorrect");
- }
-
- @Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() throws DatafileTaskException {
- // given
+ public void whenPassedObjectFits_ReturnsCorrectStatus() {
prepareMocksForTests(HttpStatus.OK.value());
- // when
- dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
-
- // then
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
- }
-
- @Test
- public void whenPassedObjectFits_ReturnsNoContent() throws DatafileTaskException {
- // given
- prepareMocksForTests(HttpStatus.NO_CONTENT.value());
-
- dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
- // then
verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -113,7 +75,7 @@ class DmaapPublisherTaskImplTest {
private void prepareMocksForTests(Integer httpResponseCode) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
- .thenReturn(Mono.just(httpResponseCode.toString()));
+ .thenReturn(Flux.just(httpResponseCode.toString()));
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
new file mode 100644
index 00000000..528a481c
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -0,0 +1,131 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class XnfCollectorTaskImplTest {
+
+ private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+ private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String SFTP_SCHEME = "sftp://";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String FTPES_LOCATION =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+
+ private FtpsClient ftpsClientMock = mock(FtpsClient.class);
+
+ private SftpClient sftpClientMock = mock(SftpClient.class);
+
+ private XnfCollectorTask collectorUndetTest = new XnfCollectorTaskImpl(ftpsClientMock, sftpClientMock);
+
+ @Test
+ public void whenSingleFtpesFile_returnCorrectResponse() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
+ .password(PWD).port(PORT_22).build();
+ when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+ .verifyComplete();
+
+ verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void whenSingleSftpFile_returnCorrectResponse() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+ .password("").port(PORT_22).build();
+ when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+ .verifyComplete();
+
+ verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void whenWrongScheme_returnEmpty() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location("http://host.com/file.zip")
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+ .password("").port(PORT_22).build();
+ when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+}