summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/config/application.yaml1
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java12
2 files changed, 11 insertions, 2 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index cef185c6..b66f7b6e 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -14,6 +14,7 @@ logging:
ROOT: ERROR
org.springframework: ERROR
org.springframework.data: ERROR
+ org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.dcaegen2.collectors.datafile: ERROR
file: opt/log/application.log
app:
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 171dd024..c465fe94 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -59,11 +60,18 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
-
+ //@formatter:off
consumeFromDmaapMessage()
+ .publishOn(Schedulers.parallel())
+ .cache()
.doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+ .flatMap(this::collectFilesFromXnf)
+ .retry(3)
+ .cache()
+ .flatMap(this::publishToDmaapConfiguration)
+ .retry(3)
.subscribe(this::onSuccess, this::onError, this::onComplete);
+ //@formatter:on
}
private void onComplete() {