summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java17
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java66
-rw-r--r--datafile-app-server/src/main/resources/logback-spring.xml49
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java5
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java23
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java47
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java17
12 files changed, 210 insertions, 119 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 9838afb1..208e8a43 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -17,15 +17,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig {
}
- protected void runTask() {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel())
+ protected void runTask(Map<String, String> contextMap) {
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
.subscribe(this::parsingConfigSuccess, this::parsingConfigError);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 106725ce..8f417261 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,7 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,14 +13,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END==========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
@@ -38,7 +40,8 @@ class EnvironmentProcessor {
private EnvironmentProcessor() {
}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index bc21f96c..b4dc6353 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,22 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -45,7 +53,11 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
+ private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -68,6 +80,9 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.clear();
return Mono.defer(() -> Mono
.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
@@ -80,10 +95,20 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
+ String requestId = MDC.get(REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ MDC.put(REQUEST_ID, UUID.randomUUID().toString());
+ }
+ String invocationId = MDC.get(INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
+ }
+ contextMap = MDC.getCopyOfContextMap();
+ logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
@@ -94,4 +119,4 @@ public class SchedulerConfig {
}
}
-}
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 825308e7..a21ed041 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,16 +18,24 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -50,9 +58,24 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ public Mono<ResponseEntity<String>> startTasks() {
+ logger.trace("Start scheduling worker request");
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ }
+
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
- public Mono<ResponseEntity<String>> startTasks() {
+ public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
+ String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(REQUEST_ID, requestId);
+ MDC.put(INVOCATION_ID, invocationId);
logger.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 4c0dcce5..57edc364 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -17,16 +17,16 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-
import reactor.core.publisher.Mono;
/**
@@ -50,21 +50,24 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
return Mono.just(model)
.cache()
- .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
-
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0b647bf5..a0020318 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Mono;
/**
@@ -52,14 +52,15 @@ public class FileCollector {
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout) {
+ Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
.cache()
- .flatMap(fd -> collectFile(fileData, metaData))
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap))
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
//@formatter:on
}
@@ -76,7 +77,9 @@ public class FileCollector {
ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("starting to collectFile");
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 783c699c..89ebde8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
@@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -86,22 +87,24 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask() {
+ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
}
- Flux<ConsumerDmaapModel> createMainTask() {
+ Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
.filter(this::shouldBePublished) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(this::collectFileFromXnf) //
- .flatMap(this::publishToDataRouter) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(model -> publishToDataRouter(model, contextMap)) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
@@ -113,15 +116,18 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete() {
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(ConsumerDmaapModel model) {
+ private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
- private void onError(Throwable throwable) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
@@ -147,38 +153,45 @@ public class ScheduledTasks {
return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
+ Map<String, String> contextMap) {
final long maxNUmberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
+ MdcVariables.setMdcContextMap(contextMap);
return fileCollect.collectorTask
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
+ contextMap)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
logger.error("File fetching failed: {}", localFileName);
- deleteFile(localFileName);
+ deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
DataRouterPublisher publisherTask = createDataRouterPublisher();
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handlePublishFailure(model, exception));
+ MdcVariables.setMdcContextMap(contextMap);
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
- deleteFile(internalFileName);
+ deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
@@ -193,17 +206,20 @@ public class ScheduledTasks {
return Flux.empty();
}
+ Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
.execute() //
- .onErrorResume(this::handleConsumeMessageFailure);
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
- private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
- logger.error("Polling for file ready message filed, exception: {}", exception);
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Polling for file ready message failed, exception: {}", exception);
return Flux.empty();
}
- private void deleteFile(Path localFile) {
+ private void deleteFile(Path localFile, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
diff --git a/datafile-app-server/src/main/resources/logback-spring.xml b/datafile-app-server/src/main/resources/logback-spring.xml
index af4ab189..1b9818d5 100644
--- a/datafile-app-server/src/main/resources/logback-spring.xml
+++ b/datafile-app-server/src/main/resources/logback-spring.xml
@@ -1,19 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}spring.log}"/>
+ <property name="outputFilename" value="application"/>
+ <property name="logPath" value="/var/log/ONAP"/>
+ <property name="maxFileSize" value="50MB"/>
+ <property name="maxHistory" value="30"/>
+ <property name="totalSizeCap" value="10GB"/>
+ <property name="defaultPattern" value="%nopexception%logger
+ |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
+ |%level
+ |%replace(%replace(%message){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%mdc){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%rootException){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%marker){'\t','\\\\t'}){'\n','\\\\n'}
+ |%thread
+ |%n"/>
<springProfile name="dev">
- <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
+ <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${defaultPattern}</pattern>
+ </encoder>
+ </appender>
+
<appender name="ROLLING-FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
+ <pattern>${defaultPattern}</pattern>
</encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${logPath}/${outputFilename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <MaxFileSize>${maxFileSize}</MaxFileSize>
+ <MaxHistory>${maxHistory}</MaxHistory>
+ <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
</rollingPolicy>
</appender>
<root level="ERROR">
@@ -26,15 +46,14 @@
<appender name="ROLLING-FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
+ <pattern>${defaultPattern}</pattern>
</encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
- <timeBasedFileNamingAndTriggeringPolicy
- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>10MB</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
+ <file>${logPath}/${outputFilename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
+ <MaxFileSize>${maxFileSize}</MaxFileSize>
+ <MaxHistory>${maxHistory}</MaxHistory>
+ <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
</rollingPolicy>
</appender>
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index efb762a8..f41ecf25 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -16,13 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.integration;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
-
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
@@ -57,6 +56,6 @@ class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
}
private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask();
+ verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(any());
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 24b82fe6..d612d17c 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -24,9 +25,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-
import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -36,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -99,10 +99,11 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_ReturnsCorrectStatus() {
prepareMocksForTests(Mono.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel).verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -110,10 +111,11 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_firstFailsThenSucceeds() {
prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel).verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -121,17 +123,18 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_firstFailsThenFails() {
prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 1/1").verify();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@SafeVarargs
final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+ when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
nextHttpResponses);
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 0662216b..8a572be4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -26,11 +26,9 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -45,7 +43,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -161,7 +158,7 @@ public class ScheduledTasksTest {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).execute();
- testedObject.scheduleMainDatafileEventTask();
+ testedObject.scheduleMainDatafileEventTask(any());
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
@@ -178,18 +175,18 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -206,20 +203,20 @@ public class ScheduledTasksTest {
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -232,23 +229,23 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
doReturn(collectedFile, error, collectedFile, collectedFile) //
.when(dataRouterMock) //
- .execute(notNull(), anyLong(), notNull());
+ .execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -264,18 +261,18 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 804b46e9..b5d3c159 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -22,10 +22,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-
import java.nio.file.Path;
import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,7 +40,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-
import reactor.test.StepVerifier;
/**
@@ -155,7 +154,8 @@ public class XnfCollectorTaskImplTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -196,7 +196,8 @@ public class XnfCollectorTaskImplTest {
.build();
// @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -211,7 +212,8 @@ public class XnfCollectorTaskImplTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -227,7 +229,8 @@ public class XnfCollectorTaskImplTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);