diff options
Diffstat (limited to 'datafile-app-server')
13 files changed, 214 insertions, 119 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index ace0389c..59f8f3b7 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -200,6 +200,10 @@ <artifactId>testng</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> <!--REQUIRED TO GENERATE DOCUMENTATION --> <dependency> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 9838afb1..208e8a43 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,15 +17,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonObject; - +import java.util.Map; import java.util.Optional; import java.util.Properties; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; - import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; - import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig { } - protected void runTask() { - Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel()) + protected void runTask(Map<String, String> contextMap) { + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel()) .subscribe(this::parsingConfigSuccess, this::parsingConfigError); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 106725ce..8f417261 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -1,7 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,14 +13,16 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END========================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; +import java.util.Map; import java.util.Optional; import java.util.Properties; import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; @@ -38,7 +40,8 @@ class EnvironmentProcessor { private EnvironmentProcessor() { } - static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Loading configuration from system environment variables"); EnvProperties envProperties; try { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index bc21f96c..b4dc6353 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,22 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - +import org.apache.commons.lang3.StringUtils; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -45,7 +53,11 @@ public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); + private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); + private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); + private Map<String, String> contextMap; private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; @@ -68,6 +80,9 @@ public class SchedulerConfig { public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); + MdcVariables.setMdcContextMap(contextMap); + logger.info(EXIT, "Stopped Datafile workflow"); + MDC.clear(); return Mono.defer(() -> Mono .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } @@ -80,10 +95,20 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { + String requestId = MDC.get(REQUEST_ID); + if (StringUtils.isBlank(requestId)) { + MDC.put(REQUEST_ID, UUID.randomUUID().toString()); + } + String invocationId = MDC.get(INVOCATION_ID); + if (StringUtils.isBlank(invocationId)) { + MDC.put(INVOCATION_ID, UUID.randomUUID().toString()); + } + contextMap = MDC.getCopyOfContextMap(); + logger.info(ENTRY, "Start scheduling Datafile workflow"); if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); @@ -94,4 +119,4 @@ public class SchedulerConfig { } } -} +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 825308e7..a21ed041 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -18,16 +18,24 @@ package org.onap.dcaegen2.collectors.datafile.controllers; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; - import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -50,9 +58,24 @@ public class ScheduleController { this.schedulerConfig = schedulerConfig; } + public Mono<ResponseEntity<String>> startTasks() { + logger.trace("Start scheduling worker request"); + return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); + } + @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") - public Mono<ResponseEntity<String>> startTasks() { + public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) { + String requestId = headers.getFirst(X_ONAP_REQUEST_ID); + if (StringUtils.isBlank(requestId)) { + requestId = UUID.randomUUID().toString(); + } + String invocationId = headers.getFirst(X_INVOCATION_ID); + if (StringUtils.isBlank(invocationId)) { + invocationId = UUID.randomUUID().toString(); + } + MDC.put(REQUEST_ID, requestId); + MDC.put(INVOCATION_ID, invocationId); logger.trace("Receiving start scheduling worker request"); return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 4c0dcce5..57edc364 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -17,16 +17,16 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.time.Duration; - +import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; - import reactor.core.publisher.Mono; /** @@ -50,21 +50,24 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, + Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off return Mono.just(model) .cache() - .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap)) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { - + private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, + Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); return Mono.just(model); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 0b647bf5..a0020318 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Path; import java.time.Duration; - +import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Mono; /** @@ -52,14 +52,15 @@ public class FileCollector { } public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, - Duration firstBackoffTimeout) { + Duration firstBackoffTimeout, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); resolveKeyStore(); //@formatter:off return Mono.just(fileData) .cache() - .flatMap(fd -> collectFile(fileData, metaData)) + .flatMap(fd -> collectFile(fileData, metaData, contextMap)) .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); //@formatter:on } @@ -76,7 +77,9 @@ public class FileCollector { ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) { + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, + Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("starting to collectFile"); final String remoteFile = fileData.remoteFilePath(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 783c699c..89ebde8f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -23,8 +23,8 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; @@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -86,22 +87,24 @@ public class ScheduledTasks { /** * Main function for scheduling for the file collection Workflow. */ - public void scheduleMainDatafileEventTask() { + public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Execution of tasks was registered"); applicationConfiguration.initFileStreamReader(); - createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); } - Flux<ConsumerDmaapModel> createMainTask() { + Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // .parallel(getParallelism()) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .flatMap(this::createFileCollectionTask) // .filter(this::shouldBePublished) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(this::collectFileFromXnf) // - .flatMap(this::publishToDataRouter) // - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) // + .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(model -> publishToDataRouter(model, contextMap)) // + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); } @@ -113,15 +116,18 @@ public class ScheduledTasks { alreadyPublishedFiles.purge(now); } - private void onComplete() { + private void onComplete(Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Datafile tasks have been completed"); } - private void onSuccess(ConsumerDmaapModel model) { + private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } - private void onError(Throwable throwable) { + private void onError(Throwable throwable, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); } @@ -147,38 +153,45 @@ public class ScheduledTasks { return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; } - private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { + private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect, + Map<String, String> contextMap) { final long maxNUmberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); + MdcVariables.setMdcContextMap(contextMap); return fileCollect.collectorTask - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, + contextMap) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) { + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); logger.error("File fetching failed: {}", localFileName); - deleteFile(localFileName); + deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); DataRouterPublisher publisherTask = createDataRouterPublisher(); - return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handlePublishFailure(model, exception)); + MdcVariables.setMdcContextMap(contextMap); + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception, + Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); - deleteFile(internalFileName); + deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); @@ -193,17 +206,20 @@ public class ScheduledTasks { return Flux.empty(); } + Map<String, String> contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // .execute() // - .onErrorResume(this::handleConsumeMessageFailure); + .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); } - private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { - logger.error("Polling for file ready message filed, exception: {}", exception); + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Polling for file ready message failed, exception: {}", exception); return Flux.empty(); } - private void deleteFile(Path localFile) { + private void deleteFile(Path localFile, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); diff --git a/datafile-app-server/src/main/resources/logback-spring.xml b/datafile-app-server/src/main/resources/logback-spring.xml index af4ab189..1b9818d5 100644 --- a/datafile-app-server/src/main/resources/logback-spring.xml +++ b/datafile-app-server/src/main/resources/logback-spring.xml @@ -1,19 +1,39 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}spring.log}"/> + <property name="outputFilename" value="application"/> + <property name="logPath" value="/var/log/ONAP"/> + <property name="maxFileSize" value="50MB"/> + <property name="maxHistory" value="30"/> + <property name="totalSizeCap" value="10GB"/> + <property name="defaultPattern" value="%nopexception%logger + |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} + |%level + |%replace(%replace(%message){'\t','\\\\t'}){'\n','\\\\n'} + |%replace(%replace(%mdc){'\t','\\\\t'}){'\n','\\\\n'} + |%replace(%replace(%rootException){'\t','\\\\t'}){'\n','\\\\n'} + |%replace(%replace(%marker){'\t','\\\\t'}){'\n','\\\\n'} + |%thread + |%n"/> <springProfile name="dev"> - <include resource="org/springframework/boot/logging/logback/console-appender.xml"/> + <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${defaultPattern}</pattern> + </encoder> + </appender> + <appender name="ROLLING-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> + <pattern>${defaultPattern}</pattern> </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <file>${logPath}/${outputFilename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern> + <MaxFileSize>${maxFileSize}</MaxFileSize> + <MaxHistory>${maxHistory}</MaxHistory> + <TotalSizeCap>${totalSizeCap}</TotalSizeCap> </rollingPolicy> </appender> <root level="ERROR"> @@ -26,15 +46,14 @@ <appender name="ROLLING-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> + <pattern>${defaultPattern}</pattern> </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern> - <timeBasedFileNamingAndTriggeringPolicy - class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> - <maxFileSize>10MB</maxFileSize> - </timeBasedFileNamingAndTriggeringPolicy> + <file>${logPath}/${outputFilename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern> + <MaxFileSize>${maxFileSize}</MaxFileSize> + <MaxHistory>${maxHistory}</MaxHistory> + <TotalSizeCap>${totalSizeCap}</TotalSizeCap> </rollingPolicy> </appender> diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java index efb762a8..f41ecf25 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java @@ -16,13 +16,12 @@ package org.onap.dcaegen2.collectors.datafile.integration; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; - import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension; @@ -57,6 +56,6 @@ class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests { } private void verifyDmaapConsumerTask() { - verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(); + verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(any()); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 24b82fe6..d612d17c 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -24,9 +25,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; - import java.time.Duration; - +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -36,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.springframework.http.HttpStatus; - import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -99,10 +99,11 @@ class DataRouterPublisherTest { public void whenPassedObjectFits_ReturnsCorrectStatus() { prepareMocksForTests(Mono.just(HttpStatus.OK)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectNext(consumerDmaapModel).verifyComplete(); - verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); + verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap)); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @@ -110,10 +111,11 @@ class DataRouterPublisherTest { public void whenPassedObjectFits_firstFailsThenSucceeds() { prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectNext(consumerDmaapModel).verifyComplete(); - verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap)); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @@ -121,17 +123,18 @@ class DataRouterPublisherTest { public void whenPassedObjectFits_firstFailsThenFails() { prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectErrorMessage("Retries exhausted: 1/1").verify(); - verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap)); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @SafeVarargs final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, + when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse, nextHttpResponses); dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 0662216b..8a572be4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -26,11 +26,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; - import java.time.Duration; import java.util.LinkedList; import java.util.List; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -45,7 +43,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -161,7 +158,7 @@ public class ScheduledTasksTest { doReturn(consumerMock).when(testedObject).createConsumerTask(); doReturn(Flux.empty()).when(consumerMock).execute(); - testedObject.scheduleMainDatafileEventTask(); + testedObject.scheduleMainDatafileEventTask(any()); assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); @@ -178,18 +175,18 @@ public class ScheduledTasksTest { doReturn(fileReadyMessages).when(consumerMock).execute(); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); - doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // .expectNextCount(noOfFiles) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull()); - verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull()); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -206,20 +203,20 @@ public class ScheduledTasksTest { // First file collect will fail, 3 will succeed doReturn(error, collectedFile, collectedFile, collectedFile) // .when(fileCollectorMock) // - .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class)); + .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any()); - doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); - doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // .expectNextCount(3) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); - verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull()); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -232,23 +229,23 @@ public class ScheduledTasksTest { doReturn(fileReadyMessages).when(consumerMock).execute(); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); Mono<Object> error = Mono.error(new Exception("problem")); // One publish will fail, the rest will succeed doReturn(collectedFile, error, collectedFile, collectedFile) // .when(dataRouterMock) // - .execute(notNull(), anyLong(), notNull()); + .execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // .expectNextCount(3) // 3 completed files .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); - verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull()); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -264,18 +261,18 @@ public class ScheduledTasksTest { doReturn(fileReadyMessages).when(consumerMock).execute(); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); - doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // .expectNextCount(1) // 99 is skipped .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull()); - verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull()); + verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 804b46e9..b5d3c159 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -22,10 +22,10 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; - import java.nio.file.Path; import java.time.Duration; - +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -40,7 +40,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; - import reactor.test.StepVerifier; /** @@ -155,7 +154,8 @@ public class XnfCollectorTaskImplTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -196,7 +196,8 @@ public class XnfCollectorTaskImplTest { .build(); // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -211,7 +212,8 @@ public class XnfCollectorTaskImplTest { doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) .expectErrorMessage("Retries exhausted: 3/3").verify(); verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -227,7 +229,8 @@ public class XnfCollectorTaskImplTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); |