aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java86
1 files changed, 43 insertions, 43 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 300ca601..bc73ddb2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -35,12 +36,12 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
-
/**
* This implements the main flow of the data file collector. Fetch file ready events from the
* message router, fetch new files from the PNF publish these in the data router.
@@ -84,10 +85,10 @@ public class ScheduledTasks {
try {
if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) {
logger.info(
- "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, "
- + "published files: {}, number of queued VES events: {}",
- getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(),
- threadPoolQueueSize.get());
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, "
+ + "published files: {}, number of queued VES events: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(),
+ threadPoolQueueSize.get());
return;
}
if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
@@ -99,15 +100,15 @@ public class ScheduledTasks {
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
createMainTask(context) //
- .subscribe(ScheduledTasks::onSuccess, //
- throwable -> {
- onError(throwable, context);
- currentNumberOfSubscriptions.decrementAndGet();
- }, //
- () -> {
- onComplete(context);
- currentNumberOfSubscriptions.decrementAndGet();
- });
+ .subscribe(ScheduledTasks::onSuccess, //
+ throwable -> {
+ onError(throwable, context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ }, //
+ () -> {
+ onComplete(context);
+ currentNumberOfSubscriptions.decrementAndGet();
+ });
} catch (Exception e) {
logger.error("Unexpected exception: {}", e.toString(), e);
}
@@ -115,21 +116,21 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
- .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
- .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
- .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
- .runOn(scheduler) //
- .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
- .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
- .flatMap(fileData -> createMdcContext(fileData, context)) //
- .filter(this::isFeedConfigured) //
- .filter(this::shouldBePublished) //
- .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(this::publishToDataRouter, false, 1, 1) //
- .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
- .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
- .sequential();
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
+ .flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
+ .filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+ .flatMap(this::fetchFile, false, 1, 1) //
+ .flatMap(this::publishToDataRouter, false, 1, 1) //
+ .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
+ .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
+ .sequential();
}
private class FileDataWithContext {
@@ -157,7 +158,6 @@ public class ScheduledTasks {
return this.counters;
}
-
protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -213,7 +213,7 @@ public class ScheduledTasks {
return true;
} else {
logger.info("No feed is configured for: {}, file ignored: {}",
- fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
return false;
}
}
@@ -223,7 +223,7 @@ public class ScheduledTasks {
if (publishedFilesCache.put(localFilePath) == null) {
try {
boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
- fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
return result;
} catch (DatafileTaskException e) {
logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
@@ -237,9 +237,9 @@ public class ScheduledTasks {
private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
MDC.setContextMap(fileData.context);
return createFileCollector() //
- .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT,
- fileData.context) //
- .onErrorResume(exception -> handleFetchFileFailure(fileData));
+ .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT,
+ fileData.context) //
+ .onErrorResume(exception -> handleFetchFileFailure(fileData));
}
private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) {
@@ -257,8 +257,8 @@ public class ScheduledTasks {
MDC.setContextMap(publishInfo.getContext());
return createDataRouterPublisher()
- .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
- .onErrorResume(exception -> handlePublishFailure(publishInfo));
+ .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
+ .onErrorResume(exception -> handlePublishFailure(publishInfo));
}
private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) {
@@ -277,15 +277,15 @@ public class ScheduledTasks {
*/
Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
- + "number of subscriptions: {}",
- getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
+ getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
try {
return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
} catch (Exception e) {
logger.error("Could not create message consumer task", e);
return Flux.empty();
@@ -295,7 +295,7 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
- this.applicationConfiguration.getDmaapConsumerConfiguration());
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}