summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java102
1 files changed, 64 insertions, 38 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 50f5431a..783c699c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -40,23 +40,23 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
+ private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of files from one PNF */
+ /** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
- // file in one VES
- // event
+ final FileCollector collectorTask;
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -68,16 +68,15 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger taskCounter = new AtomicInteger();
-
+ private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final Scheduler scheduler =
+ Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
- * @param xnfCollectorTask - second task
- * @param dmaapPublisherTask - third task
*/
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
@@ -90,20 +89,21 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- //@formatter:off
- consumeMessagesFromDmaap()
- .parallel() // Each FileReadyMessage in a separate thread
- .runOn(Schedulers.parallel())
- .flatMap(this::createFileCollectionTask)
- .filter(this::shouldBePublished)
- .doOnNext(fileData -> taskCounter.incrementAndGet())
- .flatMap(this::collectFileFromXnf)
- .flatMap(this::publishToDataRouter)
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
- .doOnNext(model -> taskCounter.decrementAndGet())
- .sequential()
- .subscribe(this::onSuccess, this::onError, this::onComplete);
- //@formatter:on
+ createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ Flux<ConsumerDmaapModel> createMainTask() {
+ return fetchMoreFileReadyMessages() //
+ .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .flatMap(this::createFileCollectionTask) //
+ .filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+ .flatMap(this::collectFileFromXnf) //
+ .flatMap(this::publishToDataRouter) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .sequential();
}
/**
@@ -125,13 +125,20 @@ public class ScheduledTasks {
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
+ private int getParallelism() {
+ if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
+ return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
+ } else {
+ return 1; // We need at least one rail/thread
+ }
+ }
+
private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
- fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
+ fileCollects.add(
+ new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -154,7 +161,7 @@ public class ScheduledTasks {
logger.error("File fetching failed: {}", localFileName);
deleteFile(localFileName);
alreadyPublishedFiles.remove(localFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -162,7 +169,7 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+ DataRouterPublisher publisherTask = createDataRouterPublisher();
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
@@ -173,20 +180,21 @@ public class ScheduledTasks {
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
- taskCounter.decrementAndGet();
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
- final int currentNumberOfTasks = taskCounter.get();
- logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
- if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ /**
+ * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+ */
+ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
return Flux.empty();
}
- final DMaaPMessageConsumerTask messageConsumerTask =
- new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute() //
+ return createConsumerTask() //
+ .execute() //
.onErrorResume(this::handleConsumeMessageFailure);
}
@@ -200,7 +208,25 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.warn("Could not delete file: {}, {}", localFile, e);
+ logger.trace("Could not delete file: {}", localFile);
}
}
+
+ int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
+ }
+
+ DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ }
+
+ FileCollector createFileCollector(FileData fileData) {
+ return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
+ }
+
+ DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
+ }
+
}