diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-02-20 08:35:16 +0100 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-03-06 10:24:33 +0100 |
commit | f41226f4c3f6675019f6b60508d211a0df102e8a (patch) | |
tree | 1a6dbc43050bd61b8400d429131119683c9c31f4 /datafile-app-server | |
parent | 2bf13db00a2733d6c56cd8f0c0d26905586bd39f (diff) |
Improvement of the parallelism
The reactive framework Scedulers uses to few threads.
(the same number as the number of processors).
That is too few for an io-intense application like this
where CPU is not the limiting factor.
Change-Id: Ia5f41e75716d309f47dce5f5273b739f7e6d136a
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
3 files changed, 379 insertions, 67 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 50f5431a..783c699c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -40,23 +40,23 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; + private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - /** Data needed for fetching of files from one PNF */ + /** Data needed for fetching of one file */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; // Same object, ftp session etc. can be used for each - // file in one VES - // event + final FileCollector collectorTask; final MessageMetaData metaData; FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { @@ -68,16 +68,15 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger taskCounter = new AtomicInteger(); - + private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final Scheduler scheduler = + Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. * * @param applicationConfiguration - application configuration - * @param xnfCollectorTask - second task - * @param dmaapPublisherTask - third task */ @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { @@ -90,20 +89,21 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); applicationConfiguration.initFileStreamReader(); - //@formatter:off - consumeMessagesFromDmaap() - .parallel() // Each FileReadyMessage in a separate thread - .runOn(Schedulers.parallel()) - .flatMap(this::createFileCollectionTask) - .filter(this::shouldBePublished) - .doOnNext(fileData -> taskCounter.incrementAndGet()) - .flatMap(this::collectFileFromXnf) - .flatMap(this::publishToDataRouter) - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) - .doOnNext(model -> taskCounter.decrementAndGet()) - .sequential() - .subscribe(this::onSuccess, this::onError, this::onComplete); - //@formatter:on + createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete); + } + + Flux<ConsumerDmaapModel> createMainTask() { + return fetchMoreFileReadyMessages() // + .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .runOn(scheduler) // + .flatMap(this::createFileCollectionTask) // + .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // + .flatMap(this::collectFileFromXnf) // + .flatMap(this::publishToDataRouter) // + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) // + .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // + .sequential(); } /** @@ -125,13 +125,20 @@ public class ScheduledTasks { logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); } + private int getParallelism() { + if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { + return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); + } else { + return 1; // We need at least one rail/thread + } + } + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), - new SftpClient(fileData.fileServerData())); - fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); + fileCollects.add( + new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } @@ -154,7 +161,7 @@ public class ScheduledTasks { logger.error("File fetching failed: {}", localFileName); deleteFile(localFileName); alreadyPublishedFiles.remove(localFileName); - taskCounter.decrementAndGet(); + currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } @@ -162,7 +169,7 @@ public class ScheduledTasks { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); - DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + DataRouterPublisher publisherTask = createDataRouterPublisher(); return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) .onErrorResume(exception -> handlePublishFailure(model, exception)); @@ -173,20 +180,21 @@ public class ScheduledTasks { Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); - taskCounter.decrementAndGet(); + currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Flux<FileReadyMessage> consumeMessagesFromDmaap() { - final int currentNumberOfTasks = taskCounter.get(); - logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); - if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + /** + * Fetch more messages from the message router. This is done in a polling/blocking fashion. + */ + private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { + logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { return Flux.empty(); } - final DMaaPMessageConsumerTask messageConsumerTask = - new DMaaPMessageConsumerTask(this.applicationConfiguration); - return messageConsumerTask.execute() // + return createConsumerTask() // + .execute() // .onErrorResume(this::handleConsumeMessageFailure); } @@ -200,7 +208,25 @@ public class ScheduledTasks { try { Files.delete(localFile); } catch (Exception e) { - logger.warn("Could not delete file: {}, {}", localFile, e); + logger.trace("Could not delete file: {}", localFile); } } + + int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); + } + + DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); + } + + FileCollector createFileCollector(FileData fileData) { + return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), + new SftpClient(fileData.fileServerData())); + } + + DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); + } + } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index d2240f18..24b82fe6 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -61,37 +61,38 @@ class DataRouterPublisherTest { @BeforeAll public static void setUp() { - //@formatter:off + dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234) - .dmaapProtocol("https") - .dmaapUserName("DFC") - .dmaapUserPassword("DFC") - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) - .build(); + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) - .internalLocation("target/" + PM_FILE_NAME) - .compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec") - .fileFormatVersion("V10") - .build(); + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .name(PM_FILE_NAME) // + .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) // + .internalLocation("target/" + PM_FILE_NAME) // + .compression("gzip") // + .fileFormatType("org.3GPP.32.435#measCollec") // + .fileFormatVersion("V10") // + .build(); // appConfig = mock(AppConfig.class); - //@formatter:on + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); } @Test @@ -132,7 +133,7 @@ class DataRouterPublisherTest { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, nextHttpResponses); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java new file mode 100644 index 00000000..0662216b --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -0,0 +1,285 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class ScheduledTasksTest { + + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + + private AppConfig appConfig = mock(AppConfig.class); + private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig)); + + private int uniqueValue = 0; + private DMaaPMessageConsumerTask consumerMock; + private FileCollector fileCollectorMock; + private DataRouterPublisher dataRouterMock; + + @BeforeEach + private void setUp() { + DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // + + consumerMock = mock(DMaaPMessageConsumerTask.class); + fileCollectorMock = mock(FileCollector.class); + dataRouterMock = mock(DataRouterPublisher.class); + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull()); + doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher(); + } + + private MessageMetaData messageMetaData() { + return ImmutableMessageMetaData.builder() // + .productName("productName") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .changeIdentifier("") // + .changeType("") // + .build(); + } + + private FileData fileData(int instanceNumber) { + return ImmutableFileData.builder() // + .name("name" + instanceNumber) // + .fileFormatType("") // + .fileFormatVersion("") // + .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) // + .scheme(Scheme.FTPS) // + .compression("") // + .build(); + } + + private List<FileData> files(int size, boolean uniqueNames) { + List<FileData> list = new LinkedList<FileData>(); + for (int i = 0; i < size; ++i) { + if (uniqueNames) { + ++uniqueValue; + } + list.add(fileData(uniqueValue)); + } + return list; + } + + private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) { + MessageMetaData md = messageMetaData(); + return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md) + .files(files(numberOfFiles, uniqueNames)).build(); + } + + private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) { + List<FileReadyMessage> list = new LinkedList<FileReadyMessage>(); + for (int i = 0; i < numberOfEvents; ++i) { + list.add(createFileReadyMessage(filesPerEvent, uniqueNames)); + } + return Flux.fromIterable(list); + } + + private ConsumerDmaapModel consumerData() { + return ImmutableConsumerDmaapModel // + .builder() // + .productName("") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .name("") // + .location("") // + .internalLocation("internalLocation") // + .compression("") // + .fileFormatType("") // + .fileFormatVersion("") // + .build(); + } + + @Test + public void notingToConsume() { + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(Flux.empty()).when(consumerMock).execute(); + + testedObject.scheduleMainDatafileEventTask(); + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase() { + final int noOfEvents = 200; + final int noOfFilesPerEvent = 200; + final int noOfFiles = noOfEvents * noOfFilesPerEvent; + + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(noOfFiles) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_fetchFailedOnce() { + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + Mono<Object> error = Mono.error(new Exception("problem")); + + // First file collect will fail, 3 will succeed + doReturn(error, collectedFile, collectedFile, collectedFile) // + .when(fileCollectorMock) // + .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class)); + + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_publishFailedOnce() { + + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + + Mono<Object> error = Mono.error(new Exception("problem")); + // One publish will fail, the rest will succeed + doReturn(collectedFile, error, collectedFile, collectedFile) // + .when(dataRouterMock) // + .execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // 3 completed files + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase_sameFileNames() { + final int noOfEvents = 1; + final int noOfFilesPerEvent = 100; + + // 100 files with the same name + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(1) // 99 is skipped + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + +} |