aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-04-04 14:41:34 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-04-04 14:41:34 +0000
commit814ddd12e695433b8c6a760cc9424dc1c0bae4d1 (patch)
treee0d5d295e8419311f7f53b2baa0501e395150163 /datafile-app-server/src/main
parentd90535ce664cfde22e0670c950498a114388df99 (diff)
Improved logging
Fixed problem with startup which lead to that the REST API was not working running locally. Fixed problem with DmaapProducerHttpClient which would use no timeout, which can lead to infinitly haninging threads. A long timeout is used instead. Change-Id: I28469b1b3aaad0dab4cf247bb8af968e71a60133 Issue-ID: DCAEGEN2-1305 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java45
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java42
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java30
9 files changed, 145 insertions, 179 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0254597e..0150d86c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -17,10 +17,11 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
+
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -33,7 +34,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Flux;
+
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -56,50 +58,51 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(
+ ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
-
- protected void runTask(Map<String, String> contextMap) {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
- .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ public void runTask() {
+ Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
+ EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
+ .subscribeOn(Schedulers.parallel()) //
+ .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
+ .flatMap(this::parseCloudConfig) //
+ .subscribe(null, this::onError, this::onComplete);
}
- private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ private void onComplete() {
+ logger.trace("Configuration updated");
}
- private void cloudConfigError(Throwable throwable) {
+ private void onError(Throwable throwable) {
logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
- private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
- reactiveCloudConfigurationProvider.callForServiceConfigurationReactive(envProperties)
- .subscribe(this::parseCloudConfig, this::cloudConfigError);
- }
-
- private synchronized void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
+ return Mono.just(this);
}
@Override
public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
+ : super.getDmaapPublisherConfiguration();
}
@Override
public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
+ return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
+ : super.getDmaapConsumerConfiguration();
}
@Override
public synchronized FtpesConfig getFtpesConfiguration() {
- return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
+ return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 8f417261..969a7e05 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -2,17 +2,15 @@
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END==========================================================================
*/
@@ -21,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -37,17 +37,19 @@ class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
- private EnvironmentProcessor() {
- }
+ private EnvironmentProcessor() {}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Loading configuration from system environment variables");
+ static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
+ MDC.setContextMap(contextMap);
+ logger.trace("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
- envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
- .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
- .appName(getService(systemEnvironment)).build();
+ envProperties = ImmutableEnvProperties.builder() //
+ .consulHost(getConsulHost(systemEnvironment)) //
+ .consulPort(getConsultPort(systemEnvironment)) //
+ .cbsName(getConfigBindingService(systemEnvironment)) //
+ .appName(getService(systemEnvironment)) //
+ .build();
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
@@ -57,29 +59,29 @@ class EnvironmentProcessor {
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
}
private static Integer getConsultPort(Properties systemEnvironments) {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "$CONFIG_BINDING_SERVICE environment has not been defined"));
}
private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ return Optional
+ .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index d5c8b3b2..58c77a11 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,27 +16,20 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
@@ -61,8 +54,6 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
- private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
- private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
@@ -94,8 +85,8 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
- MdcVariables.setMdcContextMap(contextMap);
- logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.setContextMap(contextMap);
+ logger.info("Stopped Datafile workflow");
MDC.clear();
return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
@@ -108,21 +99,13 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- String requestId = MDC.get(REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- MDC.put(REQUEST_ID, UUID.randomUUID().toString());
- }
- String invocationId = MDC.get(INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
- }
- contextMap = MDC.getCopyOfContextMap();
- logger.info(ENTRY, "Start scheduling Datafile workflow");
+ contextMap = MappedDiagnosticContext.initializeTraceContext();
+ logger.info("Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index d6c535f0..073c8462 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -2,28 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.controllers;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
@@ -42,19 +42,19 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
- @RequestMapping(value = "heartbeat", method = RequestMethod.GET)
+ @GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
- @ApiResponses(value = {
+ @ApiResponses(value = { //
@ApiResponse(code = 200, message = "DATAFILE service is living"),
@ApiResponse(code = 401, message = "You are not authorized to view the resource"),
@ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
- }
- )
- public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
- return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK))
- );
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Heartbeat request");
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK));
+ logger.trace("Heartbeat");
+ logger.info(MappedDiagnosticContext.EXIT, "Heartbeat request");
+ return response;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index a21ed041..42949f95 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,24 +18,18 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-import java.util.UUID;
-import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -46,7 +40,7 @@ import reactor.core.publisher.Mono;
*/
@RestController
-@Api(value = "ScheduleController", description = "Schedule Controller")
+@Api(value = "ScheduleController")
public class ScheduleController {
private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
@@ -58,33 +52,29 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
- public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
- }
-
- @RequestMapping(value = "start", method = RequestMethod.GET)
+ @GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
- String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- requestId = UUID.randomUUID().toString();
- }
- String invocationId = headers.getFirst(X_INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- invocationId = UUID.randomUUID().toString();
- }
- MDC.put(REQUEST_ID, requestId);
- MDC.put(INVOCATION_ID, invocationId);
- logger.trace("Receiving start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Start request");
+ Mono<ResponseEntity<String>> response = startTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Start request");
+ return response;
+ }
+
+ public Mono<ResponseEntity<String>> startTasks() {
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
+ .map(this::createStartTaskResponse);
}
- @RequestMapping(value = "stopDatafile", method = RequestMethod.GET)
+ @GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
- public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
- return schedulerConfig.getResponseFromCancellationOfTasks();
+ public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Stop request");
+ Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Stop request");
+ return response;
}
@ApiOperation(value = "Sends success or error response on starting task execution")
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 8c1a2cf4..3546a08f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -20,10 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
@@ -32,7 +28,6 @@ import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -40,9 +35,9 @@ import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +63,7 @@ public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -86,7 +81,7 @@ public class DataRouterPublisher {
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
@@ -101,18 +96,13 @@ public class DataRouterPublisher {
logger.trace("Entering publishFile with {}", consumerDmaapModel);
try {
HttpPut put = new HttpPut();
- String requestId = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationId);
-
prepareHead(consumerDmaapModel, put);
prepareBody(consumerDmaapModel, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
@@ -127,6 +117,7 @@ public class DataRouterPublisher {
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getPublishUri(model.getName()));
+ MappedDiagnosticContext.appendTraceInfo(put);
}
private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
@@ -145,7 +136,7 @@ public class DataRouterPublisher {
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
@@ -164,7 +155,7 @@ public class DataRouterPublisher {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 158bcb29..3e444af0 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -31,9 +31,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import reactor.core.publisher.Mono;
@@ -51,7 +51,7 @@ public class FileCollector {
public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
@@ -61,7 +61,7 @@ public class FileCollector {
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 41f8e3cd..89fa259c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -20,14 +20,10 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import java.io.InputStream;
import java.net.URI;
+import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
@@ -35,8 +31,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +47,7 @@ import org.slf4j.MDC;
public class PublishedChecker {
private static final String FEEDLOG_TOPIC = "feedlog";
private static final String DEFAULT_FEED_ID = "1";
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -73,20 +70,18 @@ public class PublishedChecker {
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
public boolean execute(String fileName, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- DmaapProducerReactiveHttpClient producerClient = resolveClient();
+ MDC.setContextMap(contextMap);
+ DmaapProducerHttpClient producerClient = resolveClient();
HttpGet getRequest = new HttpGet();
- String requestId = MDC.get(REQUEST_ID);
- getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ MappedDiagnosticContext.appendTraceInfo(getRequest);
+
getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
producerClient.addUserCredentialsToHead(getRequest);
try {
HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
@@ -96,12 +91,12 @@ public class PublishedChecker {
return HttpStatus.SC_OK == status && !"[]".equals(body);
}
} catch (Exception e) {
- logger.warn("Unable to check if file has been published.", e);
+ logger.warn("Unable to check if file has been published, file: {}", fileName, e);
return false;
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
return producerClient.getBaseUri() //
.pathSegment(FEEDLOG_TOPIC) //
.pathSegment(DEFAULT_FEED_ID) //
@@ -114,7 +109,7 @@ public class PublishedChecker {
return appConfig.getDmaapPublisherConfiguration();
}
- protected DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 2a6e4c0d..8b496ba2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,13 +72,15 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ public void executeDatafileMainTask() {
try {
- MdcVariables.setMdcContextMap(contextMap);
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ createMainTask(context) //
+ .subscribe(model -> onSuccess(model, context), //
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
@@ -126,17 +128,17 @@ public class ScheduledTasks {
}
private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -150,14 +152,14 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createFileCollector()
.execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFilePath, contextMap);
@@ -167,7 +169,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createDataRouterPublisher()
.execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
@@ -175,7 +177,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
@@ -201,14 +203,14 @@ public class ScheduledTasks {
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
private void deleteFile(Path localFile, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);