summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
authorHenrik Andersson <henrik.b.andersson@est.tech>2019-05-22 10:15:51 +0000
committerGerrit Code Review <gerrit@onap.org>2019-05-22 10:15:51 +0000
commit2187fc6e6b4b4783389289fb87bd6a3676ad2c72 (patch)
treeeeb3c807c3509253f079538903cfb42251991a8f /datafile-app-server
parent511eaecfebcc89346756fe43ef43e6c106602cf6 (diff)
parent7b84c15301abbca5ae586a6cbfcf482570a8c35f (diff)
Merge "Fix for robustness"
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java17
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java1
3 files changed, 16 insertions, 9 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 5bcf18c6..3a3eb3aa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -107,10 +107,9 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
- return createMessages(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
- .orElseGet(JsonObject::new)))));
+ return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new))));
}
/**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 26d47b51..b5fa0c24 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -59,6 +59,7 @@ public class ScheduledTasks {
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
@@ -78,10 +79,11 @@ public class ScheduledTasks {
*/
public void executeDatafileMainTask() {
try {
- if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) {
logger.info(
- "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}",
- getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size());
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(),
+ threadPoolQueueSize.get());
return;
}
@@ -106,18 +108,19 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(this::publishToDataRouter,false, 1, 1) //
+ .flatMap(this::publishToDataRouter, false, 1, 1) //
.doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
.doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
-
}
private class FileDataWithContext {
@@ -153,6 +156,10 @@ public class ScheduledTasks {
return currentNumberOfSubscriptions.get();
}
+ public int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 1a7db424..0d5a4231 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -202,6 +202,7 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ assertEquals(0, testedObject.getThreadPoolQueueSize());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());