aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java50
1 files changed, 31 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 99b2d918..300ca601 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -57,11 +58,12 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger currentNumberOfTasks;
private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
- private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
+ private final AtomicInteger currentNumberOfSubscriptions;
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
+ private Counters counters = new Counters();
/**
* Constructor for task registration in Datafile Workflow.
@@ -71,6 +73,8 @@ public class ScheduledTasks {
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
this.applicationConfiguration = applicationConfiguration;
+ this.currentNumberOfTasks = counters.getCurrentNumberOfTasks();
+ this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions();
}
/**
@@ -112,6 +116,7 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
@@ -148,32 +153,21 @@ public class ScheduledTasks {
return new PublishedChecker(applicationConfiguration);
}
- public int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
+ public Counters getCounters() {
+ return this.counters;
}
- public int publishedFilesCacheSize() {
- return publishedFilesCache.size();
- }
-
- public int getCurrentNumberOfSubscriptions() {
- return currentNumberOfSubscriptions.get();
- }
-
- public int getThreadPoolQueueSize() {
- return this.threadPoolQueueSize.get();
- }
protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
+ return new FileCollector(applicationConfiguration, counters);
}
protected DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
+ return new DataRouterPublisher(applicationConfiguration, counters);
}
private static void onComplete(Map<String, String> contextMap) {
@@ -181,6 +175,22 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
+ int publishedFilesCacheSize() {
+ return publishedFilesCache.size();
+ }
+
+ int getCurrentNumberOfTasks() {
+ return this.currentNumberOfTasks.get();
+ }
+
+ int getCurrentNumberOfSubscriptions() {
+ return this.currentNumberOfSubscriptions.get();
+ }
+
+ int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
@@ -239,6 +249,7 @@ public class ScheduledTasks {
deleteFile(localFilePath, fileData.context);
publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedFtp();
return Mono.empty();
}
@@ -257,6 +268,7 @@ public class ScheduledTasks {
deleteFile(internalFileName, publishInfo.getContext());
publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedPublish();
return Mono.empty();
}