From f821c0328fc0d9d579a58c80b0f86be53c896541 Mon Sep 17 00:00:00 2001 From: RehanRaza Date: Fri, 8 Mar 2019 20:16:04 +0000 Subject: DFC logging according to ONAP specification Change-Id: I6fe18ce3bdbc6d0b1cf5c5e65534cab694cfb898 Issue-ID: DCAEGEN2-1305 Signed-off-by: RehanRaza --- .../datafile/configuration/CloudConfiguration.java | 15 ++--- .../configuration/EnvironmentProcessor.java | 13 +++-- .../datafile/configuration/SchedulerConfig.java | 37 ++++++++++-- .../datafile/controllers/ScheduleController.java | 27 ++++++++- .../datafile/tasks/DataRouterPublisher.java | 17 +++--- .../collectors/datafile/tasks/FileCollector.java | 13 +++-- .../collectors/datafile/tasks/ScheduledTasks.java | 66 ++++++++++++++-------- 7 files changed, 129 insertions(+), 59 deletions(-) (limited to 'datafile-app-server/src/main/java') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 9838afb1..208e8a43 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,15 +17,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonObject; - +import java.util.Map; import java.util.Optional; import java.util.Properties; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; - import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; - import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig { } - protected void runTask() { - Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel()) + protected void runTask(Map contextMap) { + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel()) .subscribe(this::parsingConfigSuccess, this::parsingConfigError); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 106725ce..8f417261 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -1,7 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,14 +13,16 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END========================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; +import java.util.Map; import java.util.Optional; import java.util.Properties; import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; @@ -38,7 +40,8 @@ class EnvironmentProcessor { private EnvironmentProcessor() { } - static Mono evaluate(Properties systemEnvironment) { + static Mono evaluate(Properties systemEnvironment, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Loading configuration from system environment variables"); EnvProperties envProperties; try { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index bc21f96c..b4dc6353 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,22 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - +import org.apache.commons.lang3.StringUtils; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -45,7 +53,11 @@ public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); + private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); + private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); private static volatile List> scheduledFutureList = new ArrayList<>(); + private Map contextMap; private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; @@ -68,6 +80,9 @@ public class SchedulerConfig { public synchronized Mono> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); + MdcVariables.setMdcContextMap(contextMap); + logger.info(EXIT, "Stopped Datafile workflow"); + MDC.clear(); return Mono.defer(() -> Mono .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } @@ -80,10 +95,20 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { + String requestId = MDC.get(REQUEST_ID); + if (StringUtils.isBlank(requestId)) { + MDC.put(REQUEST_ID, UUID.randomUUID().toString()); + } + String invocationId = MDC.get(INVOCATION_ID); + if (StringUtils.isBlank(invocationId)) { + MDC.put(INVOCATION_ID, UUID.randomUUID().toString()); + } + contextMap = MDC.getCopyOfContextMap(); + logger.info(ENTRY, "Start scheduling Datafile workflow"); if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); @@ -94,4 +119,4 @@ public class SchedulerConfig { } } -} +} \ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 825308e7..a21ed041 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -18,16 +18,24 @@ package org.onap.dcaegen2.collectors.datafile.controllers; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; - import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -50,9 +58,24 @@ public class ScheduleController { this.schedulerConfig = schedulerConfig; } + public Mono> startTasks() { + logger.trace("Start scheduling worker request"); + return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); + } + @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") - public Mono> startTasks() { + public Mono> startTasks(@RequestHeader HttpHeaders headers) { + String requestId = headers.getFirst(X_ONAP_REQUEST_ID); + if (StringUtils.isBlank(requestId)) { + requestId = UUID.randomUUID().toString(); + } + String invocationId = headers.getFirst(X_INVOCATION_ID); + if (StringUtils.isBlank(invocationId)) { + invocationId = UUID.randomUUID().toString(); + } + MDC.put(REQUEST_ID, requestId); + MDC.put(INVOCATION_ID, invocationId); logger.trace("Receiving start scheduling worker request"); return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 4c0dcce5..57edc364 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -17,16 +17,16 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.time.Duration; - +import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; - import reactor.core.publisher.Mono; /** @@ -50,21 +50,24 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Mono execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, + Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off return Mono.just(model) .cache() - .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap)) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Mono handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { - + private Mono handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, + Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); return Mono.just(model); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 0b647bf5..a0020318 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Path; import java.time.Duration; - +import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Mono; /** @@ -52,14 +52,15 @@ public class FileCollector { } public Mono execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, - Duration firstBackoffTimeout) { + Duration firstBackoffTimeout, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); resolveKeyStore(); //@formatter:off return Mono.just(fileData) .cache() - .flatMap(fd -> collectFile(fileData, metaData)) + .flatMap(fd -> collectFile(fileData, metaData, contextMap)) .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); //@formatter:on } @@ -76,7 +77,9 @@ public class FileCollector { ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); } - private Mono collectFile(FileData fileData, MessageMetaData metaData) { + private Mono collectFile(FileData fileData, MessageMetaData metaData, + Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("starting to collectFile"); final String remoteFile = fileData.remoteFilePath(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 783c699c..89ebde8f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -23,8 +23,8 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; @@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -86,22 +87,24 @@ public class ScheduledTasks { /** * Main function for scheduling for the file collection Workflow. */ - public void scheduleMainDatafileEventTask() { + public void scheduleMainDatafileEventTask(Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Execution of tasks was registered"); applicationConfiguration.initFileStreamReader(); - createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); } - Flux createMainTask() { + Flux createMainTask(Map contextMap) { return fetchMoreFileReadyMessages() // .parallel(getParallelism()) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .flatMap(this::createFileCollectionTask) // .filter(this::shouldBePublished) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(this::collectFileFromXnf) // - .flatMap(this::publishToDataRouter) // - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) // + .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(model -> publishToDataRouter(model, contextMap)) // + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); } @@ -113,15 +116,18 @@ public class ScheduledTasks { alreadyPublishedFiles.purge(now); } - private void onComplete() { + private void onComplete(Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Datafile tasks have been completed"); } - private void onSuccess(ConsumerDmaapModel model) { + private void onSuccess(ConsumerDmaapModel model, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } - private void onError(Throwable throwable) { + private void onError(Throwable throwable, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); } @@ -147,38 +153,45 @@ public class ScheduledTasks { return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; } - private Mono collectFileFromXnf(FileCollectionData fileCollect) { + private Mono collectFileFromXnf(FileCollectionData fileCollect, + Map contextMap) { final long maxNUmberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); + MdcVariables.setMdcContextMap(contextMap); return fileCollect.collectorTask - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, + contextMap) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); } - private Mono handleCollectFailure(FileData fileData) { + private Mono handleCollectFailure(FileData fileData, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); logger.error("File fetching failed: {}", localFileName); - deleteFile(localFileName); + deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Mono publishToDataRouter(ConsumerDmaapModel model) { + private Mono publishToDataRouter(ConsumerDmaapModel model, Map contextMap) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); DataRouterPublisher publisherTask = createDataRouterPublisher(); - return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handlePublishFailure(model, exception)); + MdcVariables.setMdcContextMap(contextMap); + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); } - private Mono handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono handlePublishFailure(ConsumerDmaapModel model, Throwable exception, + Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); - deleteFile(internalFileName); + deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); @@ -193,17 +206,20 @@ public class ScheduledTasks { return Flux.empty(); } + Map contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // .execute() // - .onErrorResume(this::handleConsumeMessageFailure); + .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); } - private Flux handleConsumeMessageFailure(Throwable exception) { - logger.error("Polling for file ready message filed, exception: {}", exception); + private Flux handleConsumeMessageFailure(Throwable exception, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Polling for file ready message failed, exception: {}", exception); return Flux.empty(); } - private void deleteFile(Path localFile) { + private void deleteFile(Path localFile, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); -- cgit 1.2.3-korg