aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java57
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java2
2 files changed, 40 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 3e4481dd..ae8c6215 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -33,19 +34,20 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int NUMBER_OF_WORKER_THREADS = 200;
private static final int MAX_TASKS_FOR_POLLING = 50;
private static final long DATA_ROUTER_MAX_RETRIES = 5;
private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
@@ -56,8 +58,9 @@ public class ScheduledTasks {
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
- PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
+ PublishedFileCache publishedFilesCache = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
@@ -74,13 +77,27 @@ public class ScheduledTasks {
*/
public void executeDatafileMainTask() {
try {
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info(
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size());
+ return;
+ }
+
+ currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
.subscribe(model -> onSuccess(model, context), //
- thr -> onError(thr, context), //
- () -> onComplete(context));
+ throwable -> {
+ onError(throwable, context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ }, //
+ () -> {
+ onComplete(context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ });
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
@@ -88,13 +105,13 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel
.runOn(scheduler) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
- .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> fetchFile(fileData, contextMap)) //
- .flatMap(model -> publishToDataRouter(model, contextMap)) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) //
+ .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
@@ -104,7 +121,7 @@ public class ScheduledTasks {
* called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
- alreadyPublishedFiles.purge(now);
+ publishedFilesCache.purge(now);
}
protected PublishedChecker createPublishedChecker() {
@@ -149,9 +166,13 @@ public class ScheduledTasks {
private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
Path localFilePath = fileData.getLocalFilePath();
- if (alreadyPublishedFiles.put(localFilePath) == null) {
+ if (publishedFilesCache.put(localFilePath) == null) {
result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
}
+ if (!result) {
+ currentNumberOfTasks.decrementAndGet();
+ }
+
return result;
}
@@ -167,7 +188,7 @@ public class ScheduledTasks {
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFilePath, contextMap);
- alreadyPublishedFiles.remove(localFilePath);
+ publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -187,7 +208,7 @@ public class ScheduledTasks {
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
- alreadyPublishedFiles.remove(internalFileName);
+ publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -196,11 +217,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
- logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
- return Flux.empty();
- }
+ logger.info(
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 24bb759f..e07ed02d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -293,8 +293,10 @@ public class ScheduledTasksTest {
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
+ verifyNoMoreInteractions(dataRouterMock);
}
}