aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java156
1 files changed, 63 insertions, 93 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index d41e5c25..b4096c73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,8 +20,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
@@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
-
- /** Data needed for fetching of one file. */
- private class FileCollectionData {
- final FileData fileData;
- final MessageMetaData metaData;
-
- FileCollectionData(FileData fd, MessageMetaData metaData) {
- this.fileData = fd;
- this.metaData = metaData;
- }
- }
+ private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int MAX_TASKS_FOR_POLLING = 50;
+ private static final long DATA_ROUTER_MAX_RETRIES = 5;
+ private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+ private static final long FILE_TRANSFER_MAX_RETRIES = 3;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
- private final Scheduler scheduler =
- Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
@@ -86,21 +75,25 @@ public class ScheduledTasks {
* Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ try {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Execution of tasks was registered");
+ applicationConfiguration.loadConfigurationFromFile();
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
+ } catch (Exception e) {
+ logger.error("Unexpected exception: ", e);
+ }
}
Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .flatMap(this::createFileCollectionTask) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
@@ -114,63 +107,61 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile tasks have been completed");
+ protected PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
}
- private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile consumed tasks {}", model.getInternalLocation());
+ protected int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ protected DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- private int getParallelism() {
- if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
- return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
- } else {
- return 1; // We need at least one rail/thread
- }
+ protected FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
+ }
+
+ protected DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
}
- private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
- List<FileCollectionData> fileCollects = new ArrayList<>();
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Datafile tasks have been completed");
+ }
- for (FileData fileData : availableFiles.files()) {
- fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
- }
- return Flux.fromIterable(fileCollects);
+ private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info("Datafile file published {}", model.getInternalLocation());
}
- private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
+ }
+
+ private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
- Path localFileName = task.fileData.getLocalFileName();
+ Path localFileName = fileData.getLocalFileName();
if (alreadyPublishedFiles.put(localFileName) == null) {
result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
- Map<String, String> contextMap) {
- final long maxNUmberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
+ private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
+ .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
- logger.error("File fetching failed: {}", localFileName);
+ logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
@@ -178,19 +169,16 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- final long maxNumberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
-
MdcVariables.setMdcContextMap(contextMap);
- return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
+
+ return createDataRouterPublisher()
+ .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
@@ -202,8 +190,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
return Flux.empty();
}
@@ -215,7 +204,8 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("Polling for file ready message failed, exception: {}", exception);
+ logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
@@ -229,24 +219,4 @@ public class ScheduledTasks {
}
}
- PublishedChecker createPublishedChecker() {
- return new PublishedChecker(applicationConfiguration);
- }
-
- int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
- }
-
- DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
- }
-
- FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
- }
-
- DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
- }
-
}