summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/pom.xml4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java17
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java66
-rw-r--r--datafile-app-server/src/main/resources/logback-spring.xml49
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java5
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java23
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java47
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java17
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java42
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java9
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java29
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java11
17 files changed, 289 insertions, 135 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index ace0389c..59f8f3b7 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -200,6 +200,10 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<!--REQUIRED TO GENERATE DOCUMENTATION -->
<dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 9838afb1..208e8a43 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -17,15 +17,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig {
}
- protected void runTask() {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel())
+ protected void runTask(Map<String, String> contextMap) {
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
.subscribe(this::parsingConfigSuccess, this::parsingConfigError);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 106725ce..8f417261 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,7 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,14 +13,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END==========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
@@ -38,7 +40,8 @@ class EnvironmentProcessor {
private EnvironmentProcessor() {
}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index bc21f96c..b4dc6353 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,22 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -45,7 +53,11 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
+ private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
+ private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -68,6 +80,9 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.clear();
return Mono.defer(() -> Mono
.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
@@ -80,10 +95,20 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
+ String requestId = MDC.get(REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ MDC.put(REQUEST_ID, UUID.randomUUID().toString());
+ }
+ String invocationId = MDC.get(INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
+ }
+ contextMap = MDC.getCopyOfContextMap();
+ logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
@@ -94,4 +119,4 @@ public class SchedulerConfig {
}
}
-}
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 825308e7..a21ed041 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,16 +18,24 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -50,9 +58,24 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ public Mono<ResponseEntity<String>> startTasks() {
+ logger.trace("Start scheduling worker request");
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ }
+
@RequestMapping(value = "start", method = RequestMethod.GET)
@ApiOperation(value = "Start scheduling worker request")
- public Mono<ResponseEntity<String>> startTasks() {
+ public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
+ String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(REQUEST_ID, requestId);
+ MDC.put(INVOCATION_ID, invocationId);
logger.trace("Receiving start scheduling worker request");
return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 4c0dcce5..57edc364 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -17,16 +17,16 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-
import reactor.core.publisher.Mono;
/**
@@ -50,21 +50,24 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
return Mono.just(model)
.cache()
- .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
-
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0b647bf5..a0020318 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
import java.time.Duration;
-
+import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Mono;
/**
@@ -52,14 +52,15 @@ public class FileCollector {
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout) {
+ Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
.cache()
- .flatMap(fd -> collectFile(fileData, metaData))
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap))
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
//@formatter:on
}
@@ -76,7 +77,9 @@ public class FileCollector {
ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("starting to collectFile");
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 783c699c..89ebde8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
@@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -86,22 +87,24 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask() {
+ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
}
- Flux<ConsumerDmaapModel> createMainTask() {
+ Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
.filter(this::shouldBePublished) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(this::collectFileFromXnf) //
- .flatMap(this::publishToDataRouter) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(model -> publishToDataRouter(model, contextMap)) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
@@ -113,15 +116,18 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete() {
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(ConsumerDmaapModel model) {
+ private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
- private void onError(Throwable throwable) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
@@ -147,38 +153,45 @@ public class ScheduledTasks {
return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
+ Map<String, String> contextMap) {
final long maxNUmberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
+ MdcVariables.setMdcContextMap(contextMap);
return fileCollect.collectorTask
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
+ contextMap)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
logger.error("File fetching failed: {}", localFileName);
- deleteFile(localFileName);
+ deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
DataRouterPublisher publisherTask = createDataRouterPublisher();
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handlePublishFailure(model, exception));
+ MdcVariables.setMdcContextMap(contextMap);
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
- deleteFile(internalFileName);
+ deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
@@ -193,17 +206,20 @@ public class ScheduledTasks {
return Flux.empty();
}
+ Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
.execute() //
- .onErrorResume(this::handleConsumeMessageFailure);
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
- private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
- logger.error("Polling for file ready message filed, exception: {}", exception);
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Polling for file ready message failed, exception: {}", exception);
return Flux.empty();
}
- private void deleteFile(Path localFile) {
+ private void deleteFile(Path localFile, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
diff --git a/datafile-app-server/src/main/resources/logback-spring.xml b/datafile-app-server/src/main/resources/logback-spring.xml
index af4ab189..1b9818d5 100644
--- a/datafile-app-server/src/main/resources/logback-spring.xml
+++ b/datafile-app-server/src/main/resources/logback-spring.xml
@@ -1,19 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}spring.log}"/>
+ <property name="outputFilename" value="application"/>
+ <property name="logPath" value="/var/log/ONAP"/>
+ <property name="maxFileSize" value="50MB"/>
+ <property name="maxHistory" value="30"/>
+ <property name="totalSizeCap" value="10GB"/>
+ <property name="defaultPattern" value="%nopexception%logger
+ |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
+ |%level
+ |%replace(%replace(%message){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%mdc){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%rootException){'\t','\\\\t'}){'\n','\\\\n'}
+ |%replace(%replace(%marker){'\t','\\\\t'}){'\n','\\\\n'}
+ |%thread
+ |%n"/>
<springProfile name="dev">
- <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
+ <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${defaultPattern}</pattern>
+ </encoder>
+ </appender>
+
<appender name="ROLLING-FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
+ <pattern>${defaultPattern}</pattern>
</encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${logPath}/${outputFilename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <MaxFileSize>${maxFileSize}</MaxFileSize>
+ <MaxHistory>${maxHistory}</MaxHistory>
+ <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
</rollingPolicy>
</appender>
<root level="ERROR">
@@ -26,15 +46,14 @@
<appender name="ROLLING-FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
+ <pattern>${defaultPattern}</pattern>
</encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
- <timeBasedFileNamingAndTriggeringPolicy
- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>10MB</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
+ <file>${logPath}/${outputFilename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
+ <MaxFileSize>${maxFileSize}</MaxFileSize>
+ <MaxHistory>${maxHistory}</MaxHistory>
+ <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
</rollingPolicy>
</appender>
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index efb762a8..f41ecf25 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -16,13 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.integration;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
-
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
@@ -57,6 +56,6 @@ class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
}
private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask();
+ verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(any());
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 24b82fe6..d612d17c 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -24,9 +25,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-
import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -36,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -99,10 +99,11 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_ReturnsCorrectStatus() {
prepareMocksForTests(Mono.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel).verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -110,10 +111,11 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_firstFailsThenSucceeds() {
prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel).verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -121,17 +123,18 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_firstFailsThenFails() {
prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 1/1").verify();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@SafeVarargs
final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+ when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
nextHttpResponses);
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 0662216b..8a572be4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -26,11 +26,9 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -45,7 +43,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -161,7 +158,7 @@ public class ScheduledTasksTest {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).execute();
- testedObject.scheduleMainDatafileEventTask();
+ testedObject.scheduleMainDatafileEventTask(any());
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
@@ -178,18 +175,18 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -206,20 +203,20 @@ public class ScheduledTasksTest {
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -232,23 +229,23 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
doReturn(collectedFile, error, collectedFile, collectedFile) //
.when(dataRouterMock) //
- .execute(notNull(), anyLong(), notNull());
+ .execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -264,18 +261,18 @@ public class ScheduledTasksTest {
doReturn(fileReadyMessages).when(consumerMock).execute();
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
- verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 804b46e9..b5d3c159 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -22,10 +22,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-
import java.nio.file.Path;
import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,7 +40,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-
import reactor.test.StepVerifier;
/**
@@ -155,7 +154,8 @@ public class XnfCollectorTaskImplTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -196,7 +196,8 @@ public class XnfCollectorTaskImplTest {
.build();
// @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -211,7 +212,8 @@ public class XnfCollectorTaskImplTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -227,7 +229,8 @@ public class XnfCollectorTaskImplTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java
new file mode 100644
index 00000000..9d882067
--- /dev/null
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import org.slf4j.MDC;
+
+public final class MdcVariables {
+
+ public static final String X_ONAP_REQUEST_ID = "X-ONAP-RequestID";
+ public static final String X_INVOCATION_ID = "X-InvocationID";
+ public static final String REQUEST_ID = "RequestID";
+ public static final String INVOCATION_ID = "InvocationID";
+ public static final String INSTANCE_ID = "InstanceID";
+ public static final String RESPONSE_CODE = "ResponseCode";
+ public static final String SERVICE_NAME = "ServiceName";
+
+ private MdcVariables() {
+ }
+
+ public static void setMdcContextMap(Map<String, String> mdcContextMap) {
+ if (mdcContextMap != null) {
+ MDC.setContextMap(mdcContextMap);
+ }
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index e99b8114..21266fbc 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.service;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.SERVICE_NAME;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.Builder;
-
import reactor.core.publisher.Mono;
/**
@@ -68,17 +69,21 @@ public class DmaapReactiveWebClient {
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
return Mono.just(clientResponse);
});
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
.forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
return Mono.just(clientRequest);
});
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 4869e4c2..f80fcd0f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,9 +16,11 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -28,10 +30,10 @@ import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Future;
-
import javax.net.ssl.SSLContext;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
@@ -45,14 +47,17 @@ import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
import reactor.core.publisher.Mono;
/**
@@ -66,6 +71,8 @@ public class DmaapProducerReactiveHttpClient {
private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
private static final String URI_SEPARATOR = "/";
private static final String DEFAULT_FEED_ID = "1";
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -101,11 +108,11 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
-
webClient = getWebClient();
webClient.start();
@@ -114,9 +121,10 @@ public class DmaapProducerReactiveHttpClient {
prepareBody(consumerDmaapModel, put);
addUserCredentialsToHead(put);
+ logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
Future<HttpResponse> future = webClient.execute(put, null);
HttpResponse response = future.get();
- logger.trace("{}", response);
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
webClient.close();
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -141,6 +149,11 @@ public class DmaapProducerReactiveHttpClient {
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
+
+ String requestID = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestID);
+ String invocationID = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationID);
}
private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index a0d3673d..2bbe8e1d 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -20,19 +20,18 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
@@ -51,7 +50,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
import reactor.test.StepVerifier;
/**
@@ -142,7 +140,8 @@ class DmaapProducerReactiveHttpClientTest {
httpPut.addHeader("Authorization", "Basic " + base64Creds);
fileStream.reset();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
.expectNext(HttpStatus.OK).verifyComplete();
verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
@@ -153,7 +152,7 @@ class DmaapProducerReactiveHttpClientTest {
@Test
void getHttpResponse_Fail() throws Exception {
mockWebClientDependantObject(false);
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, any()))
.expectError()
.verify();
}