aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
commite1a66425d3ba1df5ae2a8f2b99168707e08b655a (patch)
treebdfcbdbe4a3dd7286a562f974dec61691e216848 /datafile-app-server/src/test
parent4bd281390ed24b278846775c1157f82db81fddbe (diff)
Local filename updated, stability issues
The local filename is changed so it contains PNF name instead of the PNF IP address. The paralellism is restricted to 100 worker threads in order to solve problems with too many open file descriptors and out of memory. Logging is improved. Change-Id: I24ce2e23020cc253a3c7bebac1ab5cf703b5b144 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/test')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java16
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java19
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java11
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java32
6 files changed, 60 insertions, 53 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index 1f5827c8..84c5e07b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -44,6 +44,20 @@ public class FileDataTest {
private static final String LOCATION_WITHOUT_USER =
FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder()
+ .productName("PRODUCT_NAME")
+ .vendorName("VENDOR_NAME")
+ .lastEpochMicrosec("LAST_EPOCH_MICROSEC")
+ .sourceName("SOURCE_NAME")
+ .startEpochMicrosec("START_EPOCH_MICROSEC")
+ .timeZoneOffset("TIME_ZONE_OFFSET")
+ .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER")
+ .changeType("FILE_READY_CHANGE_TYPE")
+ .build();
+ }
+
private FileData properFileDataWithUser() {
// @formatter:off
return ImmutableFileData.builder()
@@ -53,6 +67,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
@@ -66,6 +81,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index b33180fa..b8aa7da2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -67,7 +66,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -100,12 +99,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -121,7 +119,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -154,12 +152,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -202,7 +199,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -235,12 +232,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -421,12 +417,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index a695e20d..98c7dc32 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -156,13 +155,12 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -187,6 +185,7 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
@@ -208,8 +207,6 @@ public class DMaaPMessageConsumerTaskImplTest {
files = new ArrayList<>();
files.add(sftpFileData);
expectedSftpMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
}
@@ -264,8 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
- when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index ed8b93f1..fe867738 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -86,13 +86,11 @@ class DataRouterPublisherTest {
private static final String FEED_ID = "1";
private static final String FILE_CONTENT = "Just a string.";
- private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
-
private static ConsumerDmaapModel consumerDmaapModel;
private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
-
+ private final Map<String, String> contextMap = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
/**
@@ -125,9 +123,8 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
-
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -170,7 +167,7 @@ class DataRouterPublisherTest {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
}
@@ -181,7 +178,7 @@ class DataRouterPublisherTest {
Integer.valueOf(HttpStatus.OK.value()));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -197,7 +194,7 @@ class DataRouterPublisherTest {
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index fb49c860..6e17f27b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -67,7 +67,7 @@ public class FileCollectorTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -93,6 +93,7 @@ public class FileCollectorTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
+ private final Map<String, String> contextMap = new HashMap<>();
private MessageMetaData createMessageMetaData() {
@@ -119,6 +120,7 @@ public class FileCollectorTest {
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.scheme(scheme)
+ .messageMetaData(createMessageMetaData())
.build();
// @formatter:on
}
@@ -160,11 +162,11 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
+ verify(ftpsClientMock, times(1)).open();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock, times(1)).close();
@@ -176,12 +178,12 @@ public class FileCollectorTest {
FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
+
FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -191,11 +193,12 @@ public class FileCollectorTest {
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
+ verify(sftpClientMock, times(2)).open();
verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
@@ -210,9 +213,8 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -228,9 +230,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- Map<String, String> contextMap = new HashMap<>();
+
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index d781cea3..f6beae02 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -30,8 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,6 +66,7 @@ public class ScheduledTasksTest {
private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
+ private Map<String, String> contextMap = new HashMap<String, String>();
@BeforeEach
private void setUp() {
@@ -115,6 +118,7 @@ public class ScheduledTasksTest {
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
.scheme(Scheme.FTPS) //
.compression("") //
+ .messageMetaData(messageMetaData())
.build();
}
@@ -130,9 +134,7 @@ public class ScheduledTasksTest {
}
private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
- MessageMetaData md = messageMetaData();
- return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
- .files(files(numberOfFiles, uniqueNames)).build();
+ return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
}
private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
@@ -185,17 +187,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -215,19 +217,19 @@ public class ScheduledTasksTest {
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
+ .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -243,7 +245,7 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
@@ -251,14 +253,14 @@ public class ScheduledTasksTest {
.when(dataRouterMock) //
.execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -277,17 +279,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);