aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-02-20 08:35:16 +0100
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-06 10:24:33 +0100
commitf41226f4c3f6675019f6b60508d211a0df102e8a (patch)
tree1a6dbc43050bd61b8400d429131119683c9c31f4
parent2bf13db00a2733d6c56cd8f0c0d26905586bd39f (diff)
Improvement of the parallelism
The reactive framework Scedulers uses to few threads. (the same number as the number of processors). That is too few for an io-intense application like this where CPU is not the limiting factor. Change-Id: Ia5f41e75716d309f47dce5f5273b739f7e6d136a Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java102
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java59
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java285
3 files changed, 379 insertions, 67 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 50f5431a..783c699c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -40,23 +40,23 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
+ private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of files from one PNF */
+ /** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
- // file in one VES
- // event
+ final FileCollector collectorTask;
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -68,16 +68,15 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger taskCounter = new AtomicInteger();
-
+ private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final Scheduler scheduler =
+ Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
- * @param xnfCollectorTask - second task
- * @param dmaapPublisherTask - third task
*/
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
@@ -90,20 +89,21 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- //@formatter:off
- consumeMessagesFromDmaap()
- .parallel() // Each FileReadyMessage in a separate thread
- .runOn(Schedulers.parallel())
- .flatMap(this::createFileCollectionTask)
- .filter(this::shouldBePublished)
- .doOnNext(fileData -> taskCounter.incrementAndGet())
- .flatMap(this::collectFileFromXnf)
- .flatMap(this::publishToDataRouter)
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
- .doOnNext(model -> taskCounter.decrementAndGet())
- .sequential()
- .subscribe(this::onSuccess, this::onError, this::onComplete);
- //@formatter:on
+ createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ Flux<ConsumerDmaapModel> createMainTask() {
+ return fetchMoreFileReadyMessages() //
+ .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .flatMap(this::createFileCollectionTask) //
+ .filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+ .flatMap(this::collectFileFromXnf) //
+ .flatMap(this::publishToDataRouter) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .sequential();
}
/**
@@ -125,13 +125,20 @@ public class ScheduledTasks {
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
+ private int getParallelism() {
+ if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
+ return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
+ } else {
+ return 1; // We need at least one rail/thread
+ }
+ }
+
private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
- fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
+ fileCollects.add(
+ new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -154,7 +161,7 @@ public class ScheduledTasks {
logger.error("File fetching failed: {}", localFileName);
deleteFile(localFileName);
alreadyPublishedFiles.remove(localFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -162,7 +169,7 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+ DataRouterPublisher publisherTask = createDataRouterPublisher();
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
@@ -173,20 +180,21 @@ public class ScheduledTasks {
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
- final int currentNumberOfTasks = taskCounter.get();
- logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
- if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ /**
+ * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+ */
+ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
return Flux.empty();
}
- final DMaaPMessageConsumerTask messageConsumerTask =
- new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute() //
+ return createConsumerTask() //
+ .execute() //
.onErrorResume(this::handleConsumeMessageFailure);
}
@@ -200,7 +208,25 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.warn("Could not delete file: {}, {}", localFile, e);
+ logger.trace("Could not delete file: {}", localFile);
}
}
+
+ int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
+ }
+
+ DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ }
+
+ FileCollector createFileCollector(FileData fileData) {
+ return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
+ }
+
+ DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
+ }
+
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d2240f18..24b82fe6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -61,37 +61,38 @@ class DataRouterPublisherTest {
@BeforeAll
public static void setUp() {
- //@formatter:off
+
dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234)
- .dmaapProtocol("https")
- .dmaapUserName("DFC")
- .dmaapUserPassword("DFC")
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
- .build();
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME)
- .internalLocation("target/" + PM_FILE_NAME)
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
+ .internalLocation("target/" + PM_FILE_NAME) //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build(); //
appConfig = mock(AppConfig.class);
- //@formatter:on
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
}
@Test
@@ -132,7 +133,7 @@ class DataRouterPublisherTest {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
new file mode 100644
index 00000000..0662216b
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -0,0 +1,285 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ScheduledTasksTest {
+
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+
+ private AppConfig appConfig = mock(AppConfig.class);
+ private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+
+ private int uniqueValue = 0;
+ private DMaaPMessageConsumerTask consumerMock;
+ private FileCollector fileCollectorMock;
+ private DataRouterPublisher dataRouterMock;
+
+ @BeforeEach
+ private void setUp() {
+ DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
+
+ consumerMock = mock(DMaaPMessageConsumerTask.class);
+ fileCollectorMock = mock(FileCollector.class);
+ dataRouterMock = mock(DataRouterPublisher.class);
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+ }
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder() //
+ .productName("productName") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .changeIdentifier("") //
+ .changeType("") //
+ .build();
+ }
+
+ private FileData fileData(int instanceNumber) {
+ return ImmutableFileData.builder() //
+ .name("name" + instanceNumber) //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
+ .scheme(Scheme.FTPS) //
+ .compression("") //
+ .build();
+ }
+
+ private List<FileData> files(int size, boolean uniqueNames) {
+ List<FileData> list = new LinkedList<FileData>();
+ for (int i = 0; i < size; ++i) {
+ if (uniqueNames) {
+ ++uniqueValue;
+ }
+ list.add(fileData(uniqueValue));
+ }
+ return list;
+ }
+
+ private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
+ MessageMetaData md = messageMetaData();
+ return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
+ .files(files(numberOfFiles, uniqueNames)).build();
+ }
+
+ private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
+ List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
+ for (int i = 0; i < numberOfEvents; ++i) {
+ list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
+ }
+ return Flux.fromIterable(list);
+ }
+
+ private ConsumerDmaapModel consumerData() {
+ return ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation("internalLocation") //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ }
+
+ @Test
+ public void notingToConsume() {
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(Flux.empty()).when(consumerMock).execute();
+
+ testedObject.scheduleMainDatafileEventTask();
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase() {
+ final int noOfEvents = 200;
+ final int noOfFilesPerEvent = 200;
+ final int noOfFiles = noOfEvents * noOfFilesPerEvent;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(noOfFiles) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_fetchFailedOnce() {
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ Mono<Object> error = Mono.error(new Exception("problem"));
+
+ // First file collect will fail, 3 will succeed
+ doReturn(error, collectedFile, collectedFile, collectedFile) //
+ .when(fileCollectorMock) //
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_publishFailedOnce() {
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+
+ Mono<Object> error = Mono.error(new Exception("problem"));
+ // One publish will fail, the rest will succeed
+ doReturn(collectedFile, error, collectedFile, collectedFile) //
+ .when(dataRouterMock) //
+ .execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) // 3 completed files
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase_sameFileNames() {
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 100;
+
+ // 100 files with the same name
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(1) // 99 is skipped
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+
+}