aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-04-11 15:36:48 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-04-11 15:36:48 +0000
commit77793c7a822e8bb7771bfb0cf9db2ce4dce865d4 (patch)
tree4d85053c31d5e1562bd22b40c05d26d6420fb713
parent314a1e4310545e5b70ff64e328f3e7eae281c5b4 (diff)
Robustness issue
Limiting the number of paralell threads when retying of ftp or publish. Previously there was no upper limit on how many paralel threads that could be started by retry attempts. For instance, a worker thread with 100 new files would start 100 new threads when the ftt server was unavailable. Change-Id: Ia8792f03ea55c0c467ef248ddc9d59187c06a946 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java57
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java2
3 files changed, 41 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 257c356c..2b3a0ef6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -63,7 +63,7 @@ public class PublishedFileCache {
}
}
- int size() {
+ public int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 037f495f..c047f15e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -33,19 +34,20 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int NUMBER_OF_WORKER_THREADS = 200;
private static final int MAX_TASKS_FOR_POLLING = 50;
private static final long DATA_ROUTER_MAX_RETRIES = 5;
private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
@@ -56,8 +58,9 @@ public class ScheduledTasks {
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
- PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
+ PublishedFileCache publishedFilesCache = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
@@ -74,13 +77,27 @@ public class ScheduledTasks {
*/
public void executeDatafileMainTask() {
try {
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info(
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size());
+ return;
+ }
+
+ currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
.subscribe(model -> onSuccess(model, context), //
- thr -> onError(thr, context), //
- () -> onComplete(context));
+ throwable -> {
+ onError(throwable, context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ }, //
+ () -> {
+ onComplete(context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ });
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
@@ -88,13 +105,13 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel
.runOn(scheduler) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
- .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> fetchFile(fileData, contextMap)) //
- .flatMap(model -> publishToDataRouter(model, contextMap)) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) //
+ .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
@@ -104,7 +121,7 @@ public class ScheduledTasks {
* called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
- alreadyPublishedFiles.purge(now);
+ publishedFilesCache.purge(now);
}
protected PublishedChecker createPublishedChecker() {
@@ -145,9 +162,13 @@ public class ScheduledTasks {
private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
Path localFilePath = fileData.getLocalFilePath();
- if (alreadyPublishedFiles.put(localFilePath) == null) {
+ if (publishedFilesCache.put(localFilePath) == null) {
result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
}
+ if (!result) {
+ currentNumberOfTasks.decrementAndGet();
+ }
+
return result;
}
@@ -163,7 +184,7 @@ public class ScheduledTasks {
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFilePath, contextMap);
- alreadyPublishedFiles.remove(localFilePath);
+ publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -183,7 +204,7 @@ public class ScheduledTasks {
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
- alreadyPublishedFiles.remove(internalFileName);
+ publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
@@ -192,11 +213,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
- logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
- return Flux.empty();
- }
+ logger.info(
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 24bb759f..e07ed02d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -293,8 +293,10 @@ public class ScheduledTasksTest {
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
+ verifyNoMoreInteractions(dataRouterMock);
}
}