diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 99b2d918..300ca601 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -57,11 +58,12 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfTasks; private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); - private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions; private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); + private Counters counters = new Counters(); /** * Constructor for task registration in Datafile Workflow. @@ -71,6 +73,8 @@ public class ScheduledTasks { @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { this.applicationConfiguration = applicationConfiguration; + this.currentNumberOfTasks = counters.getCurrentNumberOfTasks(); + this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions(); } /** @@ -112,6 +116,7 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // @@ -148,32 +153,21 @@ public class ScheduledTasks { return new PublishedChecker(applicationConfiguration); } - public int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); + public Counters getCounters() { + return this.counters; } - public int publishedFilesCacheSize() { - return publishedFilesCache.size(); - } - - public int getCurrentNumberOfSubscriptions() { - return currentNumberOfSubscriptions.get(); - } - - public int getThreadPoolQueueSize() { - return this.threadPoolQueueSize.get(); - } protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); + return new FileCollector(applicationConfiguration, counters); } protected DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); + return new DataRouterPublisher(applicationConfiguration, counters); } private static void onComplete(Map<String, String> contextMap) { @@ -181,6 +175,22 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } + int publishedFilesCacheSize() { + return publishedFilesCache.size(); + } + + int getCurrentNumberOfTasks() { + return this.currentNumberOfTasks.get(); + } + + int getCurrentNumberOfSubscriptions() { + return this.currentNumberOfSubscriptions.get(); + } + + int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); @@ -239,6 +249,7 @@ public class ScheduledTasks { deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedFtp(); return Mono.empty(); } @@ -257,6 +268,7 @@ public class ScheduledTasks { deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedPublish(); return Mono.empty(); } |