diff options
Diffstat (limited to 'datafile-app-server/src/main/java')
9 files changed, 145 insertions, 179 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 0254597e..0150d86c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -17,10 +17,11 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonObject; + import java.util.Map; -import java.util.Optional; import java.util.Properties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; + +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -33,7 +34,8 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Flux; + +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -56,50 +58,51 @@ public class CloudConfiguration extends AppConfig { private Properties systemEnvironment; @Autowired - public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { + public synchronized void setThreadPoolTaskScheduler( + ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; } - - protected void runTask(Map<String, String> contextMap) { - Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel()) - .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + public void runTask() { + Map<String,String> context = MappedDiagnosticContext.initializeTraceContext(); + EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) // + .subscribeOn(Schedulers.parallel()) // + .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) // + .flatMap(this::parseCloudConfig) // + .subscribe(null, this::onError, this::onComplete); } - private void parsingConfigError(Throwable throwable) { - logger.warn("Error in case of processing system environment, more details below: ", throwable); + private void onComplete() { + logger.trace("Configuration updated"); } - private void cloudConfigError(Throwable throwable) { + private void onError(Throwable throwable) { logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); } - private void parsingConfigSuccess(EnvProperties envProperties) { - logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul"); - reactiveCloudConfigurationProvider.callForServiceConfigurationReactive(envProperties) - .subscribe(this::parseCloudConfig, this::cloudConfigError); - } - - private synchronized void parseCloudConfig(JsonObject jsonObject) { + private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) { logger.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig(); + return Mono.just(this); } @Override public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration + : super.getDmaapPublisherConfiguration(); } @Override public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); + return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration + : super.getDmaapConsumerConfiguration(); } @Override public synchronized FtpesConfig getFtpesConfiguration() { - return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration()); + return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration(); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 8f417261..969a7e05 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -2,17 +2,15 @@ * ============LICENSE_START======================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END========================================================================== */ @@ -21,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -37,17 +37,19 @@ class EnvironmentProcessor { private static final int DEFAULT_CONSUL_PORT = 8500; private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); - private EnvironmentProcessor() { - } + private EnvironmentProcessor() {} - static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Loading configuration from system environment variables"); + static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) { + MDC.setContextMap(contextMap); + logger.trace("Loading configuration from system environment variables"); EnvProperties envProperties; try { - envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) - .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) - .appName(getService(systemEnvironment)).build(); + envProperties = ImmutableEnvProperties.builder() // + .consulHost(getConsulHost(systemEnvironment)) // + .consulPort(getConsultPort(systemEnvironment)) // + .cbsName(getConfigBindingService(systemEnvironment)) // + .appName(getService(systemEnvironment)) // + .build(); } catch (EnvironmentLoaderException e) { return Mono.error(e); } @@ -57,29 +59,29 @@ class EnvironmentProcessor { private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) - .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); + .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); } private static Integer getConsultPort(Properties systemEnvironments) { return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) - .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); + .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); } private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) - .orElseThrow( - () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); + return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) // + .orElseThrow(() -> new EnvironmentLoaderException( + "$CONFIG_BINDING_SERVICE environment has not been defined")); } private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) - .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) - .orElseThrow(() -> new EnvironmentLoaderException( - "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); + return Optional + .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) + .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) + .orElseThrow(() -> new EnvironmentLoaderException( + "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); } private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT environment has not been defined"); logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); return DEFAULT_CONSUL_PORT; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index d5c8b3b2..58c77a11 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,27 +16,20 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; - import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ScheduledFuture; import javax.annotation.PostConstruct; -import org.apache.commons.lang3.StringUtils; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; @@ -61,8 +54,6 @@ public class SchedulerConfig { private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); - private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); - private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private Map<String, String> contextMap; @@ -94,8 +85,8 @@ public class SchedulerConfig { public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); - MdcVariables.setMdcContextMap(contextMap); - logger.info(EXIT, "Stopped Datafile workflow"); + MDC.setContextMap(contextMap); + logger.info("Stopped Datafile workflow"); MDC.clear(); return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)); } @@ -108,21 +99,13 @@ public class SchedulerConfig { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { - String requestId = MDC.get(REQUEST_ID); - if (StringUtils.isBlank(requestId)) { - MDC.put(REQUEST_ID, UUID.randomUUID().toString()); - } - String invocationId = MDC.get(INVOCATION_ID); - if (StringUtils.isBlank(invocationId)) { - MDC.put(INVOCATION_ID, UUID.randomUUID().toString()); - } - contextMap = MDC.getCopyOfContextMap(); - logger.info(ENTRY, "Start scheduling Datafile workflow"); + contextMap = MappedDiagnosticContext.initializeTraceContext(); + logger.info("Start scheduling Datafile workflow"); if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); scheduledFutureList.add( - taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), + taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java index d6c535f0..073c8462 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java @@ -2,28 +2,28 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.controllers; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; import io.swagger.annotations.Api; @@ -42,19 +42,19 @@ public class HeartbeatController { private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); - @RequestMapping(value = "heartbeat", method = RequestMethod.GET) + @GetMapping("/heartbeat") @ApiOperation(value = "Returns liveness of DATAFILE service") - @ApiResponses(value = { + @ApiResponses(value = { // @ApiResponse(code = 200, message = "DATAFILE service is living"), @ApiResponse(code = 401, message = "You are not authorized to view the resource"), @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") - } - ) - public Mono<ResponseEntity<String>> heartbeat() { - logger.trace("Receiving heartbeat request"); - return Mono.defer(() -> - Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK)) - ); + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) + public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(MappedDiagnosticContext.ENTRY, "Heartbeat request"); + Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK)); + logger.trace("Heartbeat"); + logger.info(MappedDiagnosticContext.EXIT, "Heartbeat request"); + return response; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index a21ed041..42949f95 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -18,24 +18,18 @@ package org.onap.dcaegen2.collectors.datafile.controllers; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; -import java.util.UUID; -import org.apache.commons.lang3.StringUtils; import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestHeader; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -46,7 +40,7 @@ import reactor.core.publisher.Mono; */ @RestController -@Api(value = "ScheduleController", description = "Schedule Controller") +@Api(value = "ScheduleController") public class ScheduleController { private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class); @@ -58,33 +52,29 @@ public class ScheduleController { this.schedulerConfig = schedulerConfig; } - public Mono<ResponseEntity<String>> startTasks() { - logger.trace("Start scheduling worker request"); - return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); - } - - @RequestMapping(value = "start", method = RequestMethod.GET) + @GetMapping("/start") @ApiOperation(value = "Start scheduling worker request") public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) { - String requestId = headers.getFirst(X_ONAP_REQUEST_ID); - if (StringUtils.isBlank(requestId)) { - requestId = UUID.randomUUID().toString(); - } - String invocationId = headers.getFirst(X_INVOCATION_ID); - if (StringUtils.isBlank(invocationId)) { - invocationId = UUID.randomUUID().toString(); - } - MDC.put(REQUEST_ID, requestId); - MDC.put(INVOCATION_ID, invocationId); - logger.trace("Receiving start scheduling worker request"); - return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(MappedDiagnosticContext.ENTRY, "Start request"); + Mono<ResponseEntity<String>> response = startTasks(); + logger.info(MappedDiagnosticContext.EXIT, "Start request"); + return response; + } + + public Mono<ResponseEntity<String>> startTasks() { + return Mono.fromSupplier(schedulerConfig::tryToStartTask) // + .map(this::createStartTaskResponse); } - @RequestMapping(value = "stopDatafile", method = RequestMethod.GET) + @GetMapping("/stopDatafile") @ApiOperation(value = "Receiving stop scheduling worker request") - public Mono<ResponseEntity<String>> stopTask() { - logger.trace("Receiving stop scheduling worker request"); - return schedulerConfig.getResponseFromCancellationOfTasks(); + public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(MappedDiagnosticContext.ENTRY, "Stop request"); + Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks(); + logger.info(MappedDiagnosticContext.EXIT, "Stop request"); + return response; } @ApiOperation(value = "Sends success or error response on starting task execution") diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 8c1a2cf4..3546a08f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -20,10 +20,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; - import com.google.gson.JsonElement; import com.google.gson.JsonParser; import java.io.IOException; @@ -32,7 +28,6 @@ import java.net.URI; import java.nio.file.Path; import java.time.Duration; import java.util.Map; -import java.util.UUID; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; @@ -40,9 +35,9 @@ import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +63,7 @@ public class DataRouterPublisher { private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; - private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; + private DmaapProducerHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -86,7 +81,7 @@ public class DataRouterPublisher { */ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.trace("Publish called with arg {}", model); dmaapProducerReactiveHttpClient = resolveClient(); @@ -101,18 +96,13 @@ public class DataRouterPublisher { logger.trace("Entering publishFile with {}", consumerDmaapModel); try { HttpPut put = new HttpPut(); - String requestId = MDC.get(REQUEST_ID); - put.addHeader(X_ONAP_REQUEST_ID, requestId); - String invocationId = UUID.randomUUID().toString(); - put.addHeader(X_INVOCATION_ID, invocationId); - prepareHead(consumerDmaapModel, put); prepareBody(consumerDmaapModel, put); dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); HttpResponse response = dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap); - logger.trace(response.toString()); + logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); @@ -127,6 +117,7 @@ public class DataRouterPublisher { metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); put.addHeader(X_DMAAP_DR_META, metaData.toString()); put.setURI(getPublishUri(model.getName())); + MappedDiagnosticContext.appendTraceInfo(put); } private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { @@ -145,7 +136,7 @@ public class DataRouterPublisher { private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); return Mono.just(model); @@ -164,7 +155,7 @@ public class DataRouterPublisher { return datafileAppConfig.getDmaapPublisherConfiguration(); } - DmaapProducerReactiveHttpClient resolveClient() { - return new DmaapProducerReactiveHttpClient(resolveConfiguration()); + DmaapProducerHttpClient resolveClient() { + return new DmaapProducerHttpClient(resolveConfiguration()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 158bcb29..3e444af0 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -31,9 +31,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import reactor.core.publisher.Mono; @@ -51,7 +51,7 @@ public class FileCollector { public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.trace("Entering execute with {}", fileData); return Mono.just(fileData) // @@ -61,7 +61,7 @@ public class FileCollector { } private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 41f8e3cd..89fa259c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -20,14 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; - import java.io.InputStream; import java.net.URI; +import java.time.Duration; import java.util.Map; -import java.util.UUID; import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; @@ -35,8 +31,8 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +47,7 @@ import org.slf4j.MDC; public class PublishedChecker { private static final String FEEDLOG_TOPIC = "feedlog"; private static final String DEFAULT_FEED_ID = "1"; + private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -73,20 +70,18 @@ public class PublishedChecker { * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. */ public boolean execute(String fileName, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - DmaapProducerReactiveHttpClient producerClient = resolveClient(); + MDC.setContextMap(contextMap); + DmaapProducerHttpClient producerClient = resolveClient(); HttpGet getRequest = new HttpGet(); - String requestId = MDC.get(REQUEST_ID); - getRequest.addHeader(X_ONAP_REQUEST_ID, requestId); - String invocationId = UUID.randomUUID().toString(); - getRequest.addHeader(X_INVOCATION_ID, invocationId); + MappedDiagnosticContext.appendTraceInfo(getRequest); + getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); producerClient.addUserCredentialsToHead(getRequest); try { HttpResponse response = - producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap); + producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); logger.trace("{}", response); int status = response.getStatusLine().getStatusCode(); @@ -96,12 +91,12 @@ public class PublishedChecker { return HttpStatus.SC_OK == status && !"[]".equals(body); } } catch (Exception e) { - logger.warn("Unable to check if file has been published.", e); + logger.warn("Unable to check if file has been published, file: {}", fileName, e); return false; } } - private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) { + private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) { return producerClient.getBaseUri() // .pathSegment(FEEDLOG_TOPIC) // .pathSegment(DEFAULT_FEED_ID) // @@ -114,7 +109,7 @@ public class PublishedChecker { return appConfig.getDmaapPublisherConfiguration(); } - protected DmaapProducerReactiveHttpClient resolveClient() { - return new DmaapProducerReactiveHttpClient(resolveConfiguration()); + protected DmaapProducerHttpClient resolveClient() { + return new DmaapProducerHttpClient(resolveConfiguration()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 2a6e4c0d..8b496ba2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -26,7 +26,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,13 +72,15 @@ public class ScheduledTasks { /** * Main function for scheduling for the file collection Workflow. */ - public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { + public void executeDatafileMainTask() { try { - MdcVariables.setMdcContextMap(contextMap); + Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); - createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), - () -> onComplete(contextMap)); + createMainTask(context) // + .subscribe(model -> onSuccess(model, context), // + thr -> onError(thr, context), // + () -> onComplete(context)); } catch (Exception e) { logger.error("Unexpected exception: ", e); } @@ -126,17 +128,17 @@ public class ScheduledTasks { } private void onComplete(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.trace("Datafile tasks have been completed"); } private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.info("Datafile file published {}", model.getInternalLocation()); } private void onError(Throwable throwable, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } @@ -150,14 +152,14 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); return createFileCollector() .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFilePath, contextMap); @@ -167,7 +169,7 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); return createDataRouterPublisher() .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) @@ -175,7 +177,7 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); @@ -201,14 +203,14 @@ public class ScheduledTasks { } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } private void deleteFile(Path localFile, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); + MDC.setContextMap(contextMap); logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); |