aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorRehanRaza <muhammad.rehan.raza@est.tech>2019-03-08 20:16:04 +0000
committerRehanRaza <muhammad.rehan.raza@est.tech>2019-03-08 20:16:04 +0000
commitf821c0328fc0d9d579a58c80b0f86be53c896541 (patch)
tree816ba93b4c21a55fb2109fddda4e8c4eebb26ea3 /datafile-app-server/src/main/java
parent1ece9eeb3b0720253335e201fc1a643ad0f6d324 (diff)
DFC logging according to ONAP specification
Change-Id: I6fe18ce3bdbc6d0b1cf5c5e65534cab694cfb898 Issue-ID: DCAEGEN2-1305 Signed-off-by: RehanRaza <muhammad.rehan.raza@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java17
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java66
7 files changed, 129 insertions, 59 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 9838afb1..208e8a43 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -17,15 +17,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig {
}
- protected void runTask() {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel())
+ protected void runTask(Map<String, String> contextMap) {
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
.subscribe(this::parsingConfigSuccess, this::parsingConfigError);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 106725ce..8f417261 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,7 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,14 +13,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END==========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
@@ -38,7 +40,8 @@ class EnvironmentProcessor {
private EnvironmentProcessor() {
}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index bc21f96c..b4dc6353 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,22 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -45,7 +53,11 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
+ private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -68,6 +80,9 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.clear();
return Mono.defer(() -> Mono
.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
@@ -80,10 +95,20 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
+ String requestId = MDC.get(REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ MDC.put(REQUEST_ID, UUID.randomUUID().toString());
+ }
+ String invocationId = MDC.get(INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
+ }
+ contextMap = MDC.getCopyOfContextMap();
+ logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
@@ -94,4 +119,4 @@ public class SchedulerConfig {
}
}
-}
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 825308e7..a21ed041 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,16 +18,24 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -50,9 +58,24 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ public Mono<ResponseEntity<String>> startTasks() {
+ logger.trace("Start scheduling worker request");
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ }
+
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
- public Mono<ResponseEntity<String>> startTasks() {
+ public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
+ String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(REQUEST_ID, requestId);
+ MDC.put(INVOCATION_ID, invocationId);
logger.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 4c0dcce5..57edc364 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -17,16 +17,16 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-
import reactor.core.publisher.Mono;
/**
@@ -50,21 +50,24 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
return Mono.just(model)
.cache()
- .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
-
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0b647bf5..a0020318 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Mono;
/**
@@ -52,14 +52,15 @@ public class FileCollector {
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout) {
+ Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
.cache()
- .flatMap(fd -> collectFile(fileData, metaData))
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap))
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
//@formatter:on
}
@@ -76,7 +77,9 @@ public class FileCollector {
ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("starting to collectFile");
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 783c699c..89ebde8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
@@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -86,22 +87,24 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask() {
+ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
}
- Flux<ConsumerDmaapModel> createMainTask() {
+ Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
.filter(this::shouldBePublished) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(this::collectFileFromXnf) //
- .flatMap(this::publishToDataRouter) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(model -> publishToDataRouter(model, contextMap)) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
@@ -113,15 +116,18 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete() {
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(ConsumerDmaapModel model) {
+ private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
- private void onError(Throwable throwable) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
@@ -147,38 +153,45 @@ public class ScheduledTasks {
return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
+ Map<String, String> contextMap) {
final long maxNUmberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
+ MdcVariables.setMdcContextMap(contextMap);
return fileCollect.collectorTask
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
+ contextMap)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
logger.error("File fetching failed: {}", localFileName);
- deleteFile(localFileName);
+ deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
DataRouterPublisher publisherTask = createDataRouterPublisher();
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handlePublishFailure(model, exception));
+ MdcVariables.setMdcContextMap(contextMap);
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
- deleteFile(internalFileName);
+ deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
@@ -193,17 +206,20 @@ public class ScheduledTasks {
return Flux.empty();
}
+ Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
.execute() //
- .onErrorResume(this::handleConsumeMessageFailure);
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
- private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
- logger.error("Polling for file ready message filed, exception: {}", exception);
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Polling for file ready message failed, exception: {}", exception);
return Flux.empty();
}
- private void deleteFile(Path localFile) {
+ private void deleteFile(Path localFile, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);