summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java45
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java42
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java30
9 files changed, 145 insertions, 179 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0254597e..0150d86c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -17,10 +17,11 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
+
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -33,7 +34,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Flux;
+
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -56,50 +58,51 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(
+ ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
-
- protected void runTask(Map<String, String> contextMap) {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
- .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ public void runTask() {
+ Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
+ EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
+ .subscribeOn(Schedulers.parallel()) //
+ .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
+ .flatMap(this::parseCloudConfig) //
+ .subscribe(null, this::onError, this::onComplete);
}
- private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ private void onComplete() {
+ logger.trace("Configuration updated");
}
- private void cloudConfigError(Throwable throwable) {
+ private void onError(Throwable throwable) {
logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
- private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
- reactiveCloudConfigurationProvider.callForServiceConfigurationReactive(envProperties)
- .subscribe(this::parseCloudConfig, this::cloudConfigError);
- }
-
- private synchronized void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
+ return Mono.just(this);
}
@Override
public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
+ : super.getDmaapPublisherConfiguration();
}
@Override
public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
+ return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
+ : super.getDmaapConsumerConfiguration();
}
@Override
public synchronized FtpesConfig getFtpesConfiguration() {
- return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
+ return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 8f417261..969a7e05 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -2,17 +2,15 @@
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END==========================================================================
*/
@@ -21,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -37,17 +37,19 @@ class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
- private EnvironmentProcessor() {
- }
+ private EnvironmentProcessor() {}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Loading configuration from system environment variables");
+ static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
+ MDC.setContextMap(contextMap);
+ logger.trace("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
- envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
- .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
- .appName(getService(systemEnvironment)).build();
+ envProperties = ImmutableEnvProperties.builder() //
+ .consulHost(getConsulHost(systemEnvironment)) //
+ .consulPort(getConsultPort(systemEnvironment)) //
+ .cbsName(getConfigBindingService(systemEnvironment)) //
+ .appName(getService(systemEnvironment)) //
+ .build();
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
@@ -57,29 +59,29 @@ class EnvironmentProcessor {
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
}
private static Integer getConsultPort(Properties systemEnvironments) {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "$CONFIG_BINDING_SERVICE environment has not been defined"));
}
private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ return Optional
+ .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index d5c8b3b2..58c77a11 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,27 +16,20 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
@@ -61,8 +54,6 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
- private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
- private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
@@ -94,8 +85,8 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
- MdcVariables.setMdcContextMap(contextMap);
- logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.setContextMap(contextMap);
+ logger.info("Stopped Datafile workflow");
MDC.clear();
return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
@@ -108,21 +99,13 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- String requestId = MDC.get(REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- MDC.put(REQUEST_ID, UUID.randomUUID().toString());
- }
- String invocationId = MDC.get(INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
- }
- contextMap = MDC.getCopyOfContextMap();
- logger.info(ENTRY, "Start scheduling Datafile workflow");
+ contextMap = MappedDiagnosticContext.initializeTraceContext();
+ logger.info("Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index d6c535f0..073c8462 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -2,28 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.controllers;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
@@ -42,19 +42,19 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
- @RequestMapping(value = "heartbeat", method = RequestMethod.GET)
+ @GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
- @ApiResponses(value = {
+ @ApiResponses(value = { //
@ApiResponse(code = 200, message = "DATAFILE service is living"),
@ApiResponse(code = 401, message = "You are not authorized to view the resource"),
@ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
- }
- )
- public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
- return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK))
- );
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Heartbeat request");
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK));
+ logger.trace("Heartbeat");
+ logger.info(MappedDiagnosticContext.EXIT, "Heartbeat request");
+ return response;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index a21ed041..42949f95 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,24 +18,18 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-import java.util.UUID;
-import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -46,7 +40,7 @@ import reactor.core.publisher.Mono;
*/
@RestController
-@Api(value = "ScheduleController", description = "Schedule Controller")
+@Api(value = "ScheduleController")
public class ScheduleController {
private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
@@ -58,33 +52,29 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
- public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
- }
-
- @RequestMapping(value = "start", method = RequestMethod.GET)
+ @GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
- String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- requestId = UUID.randomUUID().toString();
- }
- String invocationId = headers.getFirst(X_INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- invocationId = UUID.randomUUID().toString();
- }
- MDC.put(REQUEST_ID, requestId);
- MDC.put(INVOCATION_ID, invocationId);
- logger.trace("Receiving start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Start request");
+ Mono<ResponseEntity<String>> response = startTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Start request");
+ return response;
+ }
+
+ public Mono<ResponseEntity<String>> startTasks() {
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
+ .map(this::createStartTaskResponse);
}
- @RequestMapping(value = "stopDatafile", method = RequestMethod.GET)
+ @GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
- public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
- return schedulerConfig.getResponseFromCancellationOfTasks();
+ public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Stop request");
+ Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Stop request");
+ return response;
}
@ApiOperation(value = "Sends success or error response on starting task execution")
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 8c1a2cf4..3546a08f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -20,10 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
@@ -32,7 +28,6 @@ import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -40,9 +35,9 @@ import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +63,7 @@ public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -86,7 +81,7 @@ public class DataRouterPublisher {
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
@@ -101,18 +96,13 @@ public class DataRouterPublisher {
logger.trace("Entering publishFile with {}", consumerDmaapModel);
try {
HttpPut put = new HttpPut();
- String requestId = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationId);
-
prepareHead(consumerDmaapModel, put);
prepareBody(consumerDmaapModel, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
@@ -127,6 +117,7 @@ public class DataRouterPublisher {
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getPublishUri(model.getName()));
+ MappedDiagnosticContext.appendTraceInfo(put);
}
private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
@@ -145,7 +136,7 @@ public class DataRouterPublisher {
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
@@ -164,7 +155,7 @@ public class DataRouterPublisher {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 158bcb29..3e444af0 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -31,9 +31,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import reactor.core.publisher.Mono;
@@ -51,7 +51,7 @@ public class FileCollector {
public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
@@ -61,7 +61,7 @@ public class FileCollector {
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 41f8e3cd..89fa259c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -20,14 +20,10 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import java.io.InputStream;
import java.net.URI;
+import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
@@ -35,8 +31,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +47,7 @@ import org.slf4j.MDC;
public class PublishedChecker {
private static final String FEEDLOG_TOPIC = "feedlog";
private static final String DEFAULT_FEED_ID = "1";
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -73,20 +70,18 @@ public class PublishedChecker {
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
public boolean execute(String fileName, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- DmaapProducerReactiveHttpClient producerClient = resolveClient();
+ MDC.setContextMap(contextMap);
+ DmaapProducerHttpClient producerClient = resolveClient();
HttpGet getRequest = new HttpGet();
- String requestId = MDC.get(REQUEST_ID);
- getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ MappedDiagnosticContext.appendTraceInfo(getRequest);
+
getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
producerClient.addUserCredentialsToHead(getRequest);
try {
HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
@@ -96,12 +91,12 @@ public class PublishedChecker {
return HttpStatus.SC_OK == status && !"[]".equals(body);
}
} catch (Exception e) {
- logger.warn("Unable to check if file has been published.", e);
+ logger.warn("Unable to check if file has been published, file: {}", fileName, e);
return false;
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
return producerClient.getBaseUri() //
.pathSegment(FEEDLOG_TOPIC) //
.pathSegment(DEFAULT_FEED_ID) //
@@ -114,7 +109,7 @@ public class PublishedChecker {
return appConfig.getDmaapPublisherConfiguration();
}
- protected DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 2a6e4c0d..8b496ba2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,13 +72,15 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ public void executeDatafileMainTask() {
try {
- MdcVariables.setMdcContextMap(contextMap);
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ createMainTask(context) //
+ .subscribe(model -> onSuccess(model, context), //
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
@@ -126,17 +128,17 @@ public class ScheduledTasks {
}
private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -150,14 +152,14 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createFileCollector()
.execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFilePath, contextMap);
@@ -167,7 +169,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createDataRouterPublisher()
.execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
@@ -175,7 +177,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
@@ -201,14 +203,14 @@ public class ScheduledTasks {
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
private void deleteFile(Path localFile, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);