aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTony Hansen <tony@att.com>2019-03-08 19:23:43 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-08 19:23:43 +0000
commit1ece9eeb3b0720253335e201fc1a643ad0f6d324 (patch)
tree11d1f292c0352bd83739046884eac34ba7a6a778
parentc6c15cad13e0869cbb2e83c1dda98d31c3104907 (diff)
parentf41226f4c3f6675019f6b60508d211a0df102e8a (diff)
Merge "Improvement of the parallelism"
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java102
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java59
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java285
3 files changed, 379 insertions, 67 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 50f5431a..783c699c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -40,23 +40,23 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
+ private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of files from one PNF */
+ /** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
- // file in one VES
- // event
+ final FileCollector collectorTask;
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -68,16 +68,15 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger taskCounter = new AtomicInteger();
-
+ private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final Scheduler scheduler =
+ Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
- * @param xnfCollectorTask - second task
- * @param dmaapPublisherTask - third task
*/
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
@@ -90,20 +89,21 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- //@formatter:off
- consumeMessagesFromDmaap()
- .parallel() // Each FileReadyMessage in a separate thread
- .runOn(Schedulers.parallel())
- .flatMap(this::createFileCollectionTask)
- .filter(this::shouldBePublished)
- .doOnNext(fileData -> taskCounter.incrementAndGet())
- .flatMap(this::collectFileFromXnf)
- .flatMap(this::publishToDataRouter)
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
- .doOnNext(model -> taskCounter.decrementAndGet())
- .sequential()
- .subscribe(this::onSuccess, this::onError, this::onComplete);
- //@formatter:on
+ createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ Flux<ConsumerDmaapModel> createMainTask() {
+ return fetchMoreFileReadyMessages() //
+ .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .flatMap(this::createFileCollectionTask) //
+ .filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+ .flatMap(this::collectFileFromXnf) //
+ .flatMap(this::publishToDataRouter) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .sequential();
}
/**
@@ -125,13 +125,20 @@ public class ScheduledTasks {
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
+ private int getParallelism() {
+ if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
+ return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
+ } else {
+ return 1; // We need at least one rail/thread
+ }
+ }
+
private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
- fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
+ fileCollects.add(
+ new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -154,7 +161,7 @@ public class ScheduledTasks {
logger.error("File fetching failed: {}", localFileName);
deleteFile(localFileName);
alreadyPublishedFiles.remove(localFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -162,7 +169,7 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+ DataRouterPublisher publisherTask = createDataRouterPublisher();
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
@@ -173,20 +180,21 @@ public class ScheduledTasks {
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
- final int currentNumberOfTasks = taskCounter.get();
- logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
- if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ /**
+ * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+ */
+ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
return Flux.empty();
}
- final DMaaPMessageConsumerTask messageConsumerTask =
- new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute() //
+ return createConsumerTask() //
+ .execute() //
.onErrorResume(this::handleConsumeMessageFailure);
}
@@ -200,7 +208,25 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.warn("Could not delete file: {}, {}", localFile, e);
+ logger.trace("Could not delete file: {}", localFile);
}
}
+
+ int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
+ }
+
+ DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ }
+
+ FileCollector createFileCollector(FileData fileData) {
+ return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
+ }
+
+ DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
+ }
+
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d2240f18..24b82fe6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -61,37 +61,38 @@ class DataRouterPublisherTest {
@BeforeAll
public static void setUp() {
- //@formatter:off
+
dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234)
- .dmaapProtocol("https")
- .dmaapUserName("DFC")
- .dmaapUserPassword("DFC")
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
- .build();
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME)
- .internalLocation("target/" + PM_FILE_NAME)
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
+ .internalLocation("target/" + PM_FILE_NAME) //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build(); //
appConfig = mock(AppConfig.class);
- //@formatter:on
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
}
@Test
@@ -132,7 +133,7 @@ class DataRouterPublisherTest {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
new file mode 100644
index 00000000..0662216b
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -0,0 +1,285 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ScheduledTasksTest {
+
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+
+ private AppConfig appConfig = mock(AppConfig.class);
+ private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+
+ private int uniqueValue = 0;
+ private DMaaPMessageConsumerTask consumerMock;
+ private FileCollector fileCollectorMock;
+ private DataRouterPublisher dataRouterMock;
+
+ @BeforeEach
+ private void setUp() {
+ DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
+
+ consumerMock = mock(DMaaPMessageConsumerTask.class);
+ fileCollectorMock = mock(FileCollector.class);
+ dataRouterMock = mock(DataRouterPublisher.class);
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+ }
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder() //
+ .productName("productName") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .changeIdentifier("") //
+ .changeType("") //
+ .build();
+ }
+
+ private FileData fileData(int instanceNumber) {
+ return ImmutableFileData.builder() //
+ .name("name" + instanceNumber) //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
+ .scheme(Scheme.FTPS) //
+ .compression("") //
+ .build();
+ }
+
+ private List<FileData> files(int size, boolean uniqueNames) {
+ List<FileData> list = new LinkedList<FileData>();
+ for (int i = 0; i < size; ++i) {
+ if (uniqueNames) {
+ ++uniqueValue;
+ }
+ list.add(fileData(uniqueValue));
+ }
+ return list;
+ }
+
+ private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
+ MessageMetaData md = messageMetaData();
+ return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
+ .files(files(numberOfFiles, uniqueNames)).build();
+ }
+
+ private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
+ List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
+ for (int i = 0; i < numberOfEvents; ++i) {
+ list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
+ }
+ return Flux.fromIterable(list);
+ }
+
+ private ConsumerDmaapModel consumerData() {
+ return ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation("internalLocation") //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ }
+
+ @Test
+ public void notingToConsume() {
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(Flux.empty()).when(consumerMock).execute();
+
+ testedObject.scheduleMainDatafileEventTask();
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase() {
+ final int noOfEvents = 200;
+ final int noOfFilesPerEvent = 200;
+ final int noOfFiles = noOfEvents * noOfFilesPerEvent;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(noOfFiles) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_fetchFailedOnce() {
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ Mono<Object> error = Mono.error(new Exception("problem"));
+
+ // First file collect will fail, 3 will succeed
+ doReturn(error, collectedFile, collectedFile, collectedFile) //
+ .when(fileCollectorMock) //
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_publishFailedOnce() {
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+
+ Mono<Object> error = Mono.error(new Exception("problem"));
+ // One publish will fail, the rest will succeed
+ doReturn(collectedFile, error, collectedFile, collectedFile) //
+ .when(dataRouterMock) //
+ .execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) // 3 completed files
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase_sameFileNames() {
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 100;
+
+ // 100 files with the same name
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(1) // 99 is skipped
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+
+}