aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/pom.xml11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java45
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java42
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java30
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java5
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java25
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java6
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java11
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java2
-rw-r--r--datafile-commons/pom.xml10
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java92
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java42
-rw-r--r--datafile-dmaap-client/pom.xml8
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java15
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java (renamed from datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java)30
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java (renamed from datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java)16
22 files changed, 299 insertions, 298 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 59f8f3b7..794470d6 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -174,12 +174,7 @@
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-launcher</artifactId>
- <scope>test</scope>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@@ -219,5 +214,9 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0254597e..0150d86c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -17,10 +17,11 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
+
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -33,7 +34,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Flux;
+
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -56,50 +58,51 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(
+ ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
-
- protected void runTask(Map<String, String> contextMap) {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
- .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ public void runTask() {
+ Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
+ EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
+ .subscribeOn(Schedulers.parallel()) //
+ .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
+ .flatMap(this::parseCloudConfig) //
+ .subscribe(null, this::onError, this::onComplete);
}
- private void parsingConfigError(Throwable throwable) {
- logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ private void onComplete() {
+ logger.trace("Configuration updated");
}
- private void cloudConfigError(Throwable throwable) {
+ private void onError(Throwable throwable) {
logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
- private void parsingConfigSuccess(EnvProperties envProperties) {
- logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
- reactiveCloudConfigurationProvider.callForServiceConfigurationReactive(envProperties)
- .subscribe(this::parseCloudConfig, this::cloudConfigError);
- }
-
- private synchronized void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
+ return Mono.just(this);
}
@Override
public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
+ : super.getDmaapPublisherConfiguration();
}
@Override
public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
+ return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
+ : super.getDmaapConsumerConfiguration();
}
@Override
public synchronized FtpesConfig getFtpesConfiguration() {
- return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
+ return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 8f417261..969a7e05 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -2,17 +2,15 @@
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END==========================================================================
*/
@@ -21,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -37,17 +37,19 @@ class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
- private EnvironmentProcessor() {
- }
+ private EnvironmentProcessor() {}
- static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Loading configuration from system environment variables");
+ static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
+ MDC.setContextMap(contextMap);
+ logger.trace("Loading configuration from system environment variables");
EnvProperties envProperties;
try {
- envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
- .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
- .appName(getService(systemEnvironment)).build();
+ envProperties = ImmutableEnvProperties.builder() //
+ .consulHost(getConsulHost(systemEnvironment)) //
+ .consulPort(getConsultPort(systemEnvironment)) //
+ .cbsName(getConfigBindingService(systemEnvironment)) //
+ .appName(getService(systemEnvironment)) //
+ .build();
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
@@ -57,29 +59,29 @@ class EnvironmentProcessor {
private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
}
private static Integer getConsultPort(Properties systemEnvironments) {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "$CONFIG_BINDING_SERVICE environment has not been defined"));
}
private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ return Optional
+ .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
}
private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT environment has not been defined");
logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
return DEFAULT_CONSUL_PORT;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index d5c8b3b2..58c77a11 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,27 +16,20 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
@@ -61,8 +54,6 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
- private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
- private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
@@ -94,8 +85,8 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
- MdcVariables.setMdcContextMap(contextMap);
- logger.info(EXIT, "Stopped Datafile workflow");
+ MDC.setContextMap(contextMap);
+ logger.info("Stopped Datafile workflow");
MDC.clear();
return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
@@ -108,21 +99,13 @@ public class SchedulerConfig {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- String requestId = MDC.get(REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- MDC.put(REQUEST_ID, UUID.randomUUID().toString());
- }
- String invocationId = MDC.get(INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
- }
- contextMap = MDC.getCopyOfContextMap();
- logger.info(ENTRY, "Start scheduling Datafile workflow");
+ contextMap = MappedDiagnosticContext.initializeTraceContext();
+ logger.info("Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index d6c535f0..073c8462 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -2,28 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.controllers;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
@@ -42,19 +42,19 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
- @RequestMapping(value = "heartbeat", method = RequestMethod.GET)
+ @GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
- @ApiResponses(value = {
+ @ApiResponses(value = { //
@ApiResponse(code = 200, message = "DATAFILE service is living"),
@ApiResponse(code = 401, message = "You are not authorized to view the resource"),
@ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
- }
- )
- public Mono<ResponseEntity<String>> heartbeat() {
- logger.trace("Receiving heartbeat request");
- return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK))
- );
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Heartbeat request");
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK));
+ logger.trace("Heartbeat");
+ logger.info(MappedDiagnosticContext.EXIT, "Heartbeat request");
+ return response;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index a21ed041..42949f95 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -18,24 +18,18 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-import java.util.UUID;
-import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -46,7 +40,7 @@ import reactor.core.publisher.Mono;
*/
@RestController
-@Api(value = "ScheduleController", description = "Schedule Controller")
+@Api(value = "ScheduleController")
public class ScheduleController {
private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
@@ -58,33 +52,29 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
- public Mono<ResponseEntity<String>> startTasks() {
- logger.trace("Start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
- }
-
- @RequestMapping(value = "start", method = RequestMethod.GET)
+ @GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
- String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
- if (StringUtils.isBlank(requestId)) {
- requestId = UUID.randomUUID().toString();
- }
- String invocationId = headers.getFirst(X_INVOCATION_ID);
- if (StringUtils.isBlank(invocationId)) {
- invocationId = UUID.randomUUID().toString();
- }
- MDC.put(REQUEST_ID, requestId);
- MDC.put(INVOCATION_ID, invocationId);
- logger.trace("Receiving start scheduling worker request");
- return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Start request");
+ Mono<ResponseEntity<String>> response = startTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Start request");
+ return response;
+ }
+
+ public Mono<ResponseEntity<String>> startTasks() {
+ return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
+ .map(this::createStartTaskResponse);
}
- @RequestMapping(value = "stopDatafile", method = RequestMethod.GET)
+ @GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
- public Mono<ResponseEntity<String>> stopTask() {
- logger.trace("Receiving stop scheduling worker request");
- return schedulerConfig.getResponseFromCancellationOfTasks();
+ public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(MappedDiagnosticContext.ENTRY, "Stop request");
+ Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks();
+ logger.info(MappedDiagnosticContext.EXIT, "Stop request");
+ return response;
}
@ApiOperation(value = "Sends success or error response on starting task execution")
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 8c1a2cf4..3546a08f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -20,10 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
@@ -32,7 +28,6 @@ import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -40,9 +35,9 @@ import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +63,7 @@ public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -86,7 +81,7 @@ public class DataRouterPublisher {
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
@@ -101,18 +96,13 @@ public class DataRouterPublisher {
logger.trace("Entering publishFile with {}", consumerDmaapModel);
try {
HttpPut put = new HttpPut();
- String requestId = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationId);
-
prepareHead(consumerDmaapModel, put);
prepareBody(consumerDmaapModel, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
@@ -127,6 +117,7 @@ public class DataRouterPublisher {
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getPublishUri(model.getName()));
+ MappedDiagnosticContext.appendTraceInfo(put);
}
private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
@@ -145,7 +136,7 @@ public class DataRouterPublisher {
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return Mono.just(model);
@@ -164,7 +155,7 @@ public class DataRouterPublisher {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 158bcb29..3e444af0 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -31,9 +31,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import reactor.core.publisher.Mono;
@@ -51,7 +51,7 @@ public class FileCollector {
public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
@@ -61,7 +61,7 @@ public class FileCollector {
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 41f8e3cd..89fa259c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -20,14 +20,10 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import java.io.InputStream;
import java.net.URI;
+import java.time.Duration;
import java.util.Map;
-import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
@@ -35,8 +31,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +47,7 @@ import org.slf4j.MDC;
public class PublishedChecker {
private static final String FEEDLOG_TOPIC = "feedlog";
private static final String DEFAULT_FEED_ID = "1";
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -73,20 +70,18 @@ public class PublishedChecker {
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
public boolean execute(String fileName, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- DmaapProducerReactiveHttpClient producerClient = resolveClient();
+ MDC.setContextMap(contextMap);
+ DmaapProducerHttpClient producerClient = resolveClient();
HttpGet getRequest = new HttpGet();
- String requestId = MDC.get(REQUEST_ID);
- getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
- String invocationId = UUID.randomUUID().toString();
- getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ MappedDiagnosticContext.appendTraceInfo(getRequest);
+
getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
producerClient.addUserCredentialsToHead(getRequest);
try {
HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
@@ -96,12 +91,12 @@ public class PublishedChecker {
return HttpStatus.SC_OK == status && !"[]".equals(body);
}
} catch (Exception e) {
- logger.warn("Unable to check if file has been published.", e);
+ logger.warn("Unable to check if file has been published, file: {}", fileName, e);
return false;
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
return producerClient.getBaseUri() //
.pathSegment(FEEDLOG_TOPIC) //
.pathSegment(DEFAULT_FEED_ID) //
@@ -114,7 +109,7 @@ public class PublishedChecker {
return appConfig.getDmaapPublisherConfiguration();
}
- protected DmaapProducerReactiveHttpClient resolveClient() {
- return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient() {
+ return new DmaapProducerHttpClient(resolveConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 2a6e4c0d..8b496ba2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,13 +72,15 @@ public class ScheduledTasks {
/**
* Main function for scheduling for the file collection Workflow.
*/
- public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+ public void executeDatafileMainTask() {
try {
- MdcVariables.setMdcContextMap(contextMap);
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ createMainTask(context) //
+ .subscribe(model -> onSuccess(model, context), //
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
@@ -126,17 +128,17 @@ public class ScheduledTasks {
}
private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -150,14 +152,14 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createFileCollector()
.execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFilePath, contextMap);
@@ -167,7 +169,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
return createDataRouterPublisher()
.execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
@@ -175,7 +177,7 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
@@ -201,14 +203,14 @@ public class ScheduledTasks {
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
private void deleteFile(Path localFile, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index f41ecf25..7059a7fe 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -16,12 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.integration;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
@@ -56,6 +57,6 @@ class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
}
private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(any());
+ verify(scheduledTask, atLeast(1)).executeDatafileMainTask();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index 98c7dc32..afa51de1 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -34,7 +34,6 @@ import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -48,8 +47,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
@@ -81,11 +78,8 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
-
private static List<ConsumerDmaapModel> listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
- private static AppConfig appConfig;
- private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
private DMaaPConsumerReactiveHttpClient httpClientMock;
@@ -102,25 +96,6 @@ public class DMaaPMessageConsumerTaskImplTest {
*/
@BeforeAll
public static void setUp() {
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
- .consumerGroup("OpenDCAE-c12") //
- .consumerId("c12") //
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234).dmaapProtocol("https") //
- .dmaapUserName("Datafile") //
- .dmaapUserPassword("Datafile") //
- .dmaapTopicName("unauthenticated.NOTIFICATION") //
- .timeoutMs(-1) //
- .messageLimit(-1) //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- appConfig = mock(AppConfig.class);
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 7a9a17ab..22900b38 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -49,7 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
@@ -84,7 +84,7 @@ class DataRouterPublisherTest {
private static final String FILE_CONTENT = "Just a string.";
private static ConsumerDmaapModel consumerDmaapModel;
- private static DmaapProducerReactiveHttpClient httpClientMock;
+ private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
private final Map<String, String> contextMap = new HashMap<>();
@@ -204,7 +204,7 @@ class DataRouterPublisherTest {
@SafeVarargs
final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
throws Exception {
- httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ httpClientMock = mock(DmaapProducerHttpClient.class);
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
index 3e3c2ed6..d5f65150 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -51,7 +50,7 @@ import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriBuilder;
@@ -75,7 +74,7 @@ public class PublishedCheckerTest {
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
private static AppConfig appConfigMock;
- private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
private PublishedChecker publishedCheckerUnderTestSpy;
@@ -103,7 +102,7 @@ public class PublishedCheckerTest {
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
- verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+ verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
verifyNoMoreInteractions(httpClientMock);
HttpUriRequest getRequest = requestCaptor.getValue();
@@ -158,10 +157,10 @@ public class PublishedCheckerTest {
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
- when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), any(), any()))
.thenReturn(httpResponseMock);
} else {
- when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), any(), any()))
.thenThrow(exception);
}
HttpEntity httpEntityMock = mock(HttpEntity.class);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index f6beae02..09908f13 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -168,7 +168,7 @@ public class ScheduledTasksTest {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).execute();
- testedObject.scheduleMainDatafileEventTask(any());
+ testedObject.executeDatafileMainTask();
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
diff --git a/datafile-commons/pom.xml b/datafile-commons/pom.xml
index 400ca28a..fdf62c6e 100644
--- a/datafile-commons/pom.xml
+++ b/datafile-commons/pom.xml
@@ -74,6 +74,14 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
new file mode 100644
index 00000000..bda889c2
--- /dev/null
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * Support functions for MDC
+ */
+public final class MappedDiagnosticContext {
+
+ private static final String X_ONAP_REQUEST_ID = "X-ONAP-RequestID";
+ private static final String X_INVOCATION_ID = "X-InvocationID";
+ private static final String REQUEST_ID = "RequestID";
+ private static final String INVOCATION_ID = "InvocationID";
+ public static final String RESPONSE_CODE = "ResponseCode";
+ public static final String SERVICE_NAME = "ServiceName";
+
+ public static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ public static final Marker EXIT = MarkerFactory.getMarker("EXIT");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class);
+
+ private MappedDiagnosticContext() {}
+
+ /**
+ * Inserts the relevant trace information in the HTTP header
+ * @param httpRequest a request
+ */
+ public static void appendTraceInfo(HttpRequestBase httpRequest) {
+ String requestId = MDC.get(REQUEST_ID);
+ httpRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ httpRequest.addHeader("X-RequestID", requestId); // deprecated
+ httpRequest.addHeader("X-TransactionID", requestId); // deprecated
+
+ String invocationId = UUID.randomUUID().toString();
+ httpRequest.addHeader(X_INVOCATION_ID, invocationId);
+ logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId);
+ }
+
+ /**
+ * Initialize MDC from relevant information in a received HTTP header
+ * @param headers a received HTPP header
+ */
+ public static void initializeTraceContext(HttpHeaders headers) {
+ String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(REQUEST_ID, requestId);
+ MDC.put(INVOCATION_ID, invocationId);
+ }
+
+ /**
+ * Initialize the MDC when a new context is started.
+ * @return a copy of the new trace context
+ */
+ public static Map<String, String> initializeTraceContext() {
+ MDC.clear();
+ MDC.put(REQUEST_ID, UUID.randomUUID().toString());
+ return MDC.getCopyOfContextMap();
+ }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java
deleted file mode 100644
index 9d882067..00000000
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.logging;
-
-import java.util.Map;
-import org.slf4j.MDC;
-
-public final class MdcVariables {
-
- public static final String X_ONAP_REQUEST_ID = "X-ONAP-RequestID";
- public static final String X_INVOCATION_ID = "X-InvocationID";
- public static final String REQUEST_ID = "RequestID";
- public static final String INVOCATION_ID = "InvocationID";
- public static final String INSTANCE_ID = "InstanceID";
- public static final String RESPONSE_CODE = "ResponseCode";
- public static final String SERVICE_NAME = "ServiceName";
-
- private MdcVariables() {
- }
-
- public static void setMdcContextMap(Map<String, String> mdcContextMap) {
- if (mdcContextMap != null) {
- MDC.setContextMap(mdcContextMap);
- }
- }
-}
diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml
index 0f3cf6aa..8d972573 100644
--- a/datafile-dmaap-client/pom.xml
+++ b/datafile-dmaap-client/pom.xml
@@ -116,13 +116,7 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
-
- <!-- https://mvnrepository.com/artifact/org.junit.platform/junit-platform-launcher -->
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-launcher</artifactId>
- <scope>test</scope>
- </dependency>
+
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpmime -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 21266fbc..23fd0bc7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -16,9 +16,10 @@
package org.onap.dcaegen2.collectors.datafile.service;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.SERVICE_NAME;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.RESPONSE_CODE;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.SERVICE_NAME;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.Builder;
+
import reactor.core.publisher.Mono;
/**
@@ -57,12 +59,13 @@ public class DmaapReactiveWebClient {
* @return WebClient
*/
public WebClient build() {
- Builder webClientBuilder = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
- .filter(logRequest()).filter(logResponse());
+ Builder webClientBuilder = WebClient.builder()
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) //
+ .filter(logRequest()) //
+ .filter(logResponse());
if (dmaaPUserName != null && !dmaaPUserName.isEmpty() && dmaaPUserPassword != null
&& !dmaaPUserPassword.isEmpty()) {
webClientBuilder.filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword));
-
}
return webClientBuilder.build();
}
@@ -81,7 +84,7 @@ public class DmaapReactiveWebClient {
MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
logger.trace("HTTP request headers: {}", clientRequest.headers());
MDC.remove(SERVICE_NAME);
return Mono.just(clientRequest);
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index 944d3b34..b0904b29 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
@@ -35,11 +36,11 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.web.util.DefaultUriBuilderFactory;
@@ -49,9 +50,9 @@ import org.springframework.web.util.UriBuilder;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapProducerReactiveHttpClient {
+public class DmaapProducerHttpClient {
- private static final int NO_REQUEST_TIMEOUT = -1;
+ private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(2);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
@@ -64,14 +65,14 @@ public class DmaapProducerReactiveHttpClient {
*
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
- public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.configuration = dmaapPublisherConfiguration;
}
public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
throws DatafileTaskException {
- try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
- MdcVariables.setMdcContextMap(contextMap);
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT)) {
webClient.start();
logger.trace(INVOKE, "Starting to produce to DR {}", request);
@@ -84,10 +85,10 @@ public class DmaapProducerReactiveHttpClient {
}
}
- public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout,
Map<String, String> contextMap) throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
- MdcVariables.setMdcContextMap(contextMap);
webClient.start();
logger.trace(INVOKE, "Starting to produce to DR {}", request);
@@ -116,7 +117,7 @@ public class DmaapProducerReactiveHttpClient {
.port(configuration.dmaapPortNumber());
}
- private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout)
throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
@@ -129,14 +130,17 @@ public class DmaapProducerReactiveHttpClient {
clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
}
- if (requestTimeout > NO_REQUEST_TIMEOUT) {
+ if (requestTimeout.toMillis() > 0) {
+ int millis = (int)requestTimeout.toMillis();
RequestConfig requestConfig = RequestConfig.custom() //
- .setSocketTimeout(requestTimeout) //
- .setConnectTimeout(requestTimeout) //
- .setConnectionRequestTimeout(requestTimeout) //
+ .setSocketTimeout(millis) //
+ .setConnectTimeout(millis) //
+ .setConnectionRequestTimeout(millis) //
.build();
clientBuilder.setDefaultRequestConfig(requestConfig);
+ } else {
+ logger.error("WEB client without timeout created {}", requestTimeout);
}
return clientBuilder.build();
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index 91c4c334..92a14997 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
@@ -59,18 +60,18 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapProducerReactiveHttpClientTest {
+class DmaapProducerHttpClientTest {
private static final String HOST = "54.45.33.2";
private static final String HTTPS_SCHEME = "https";
private static final int PORT = 1234;
private static final String USER_NAME = "dradmin";
- private static final int TWO_SECOND_TIMEOUT = 2000;
+ private static final Duration TWO_SECOND_TIMEOUT = Duration.ofSeconds(2);
private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
- private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
+ private DmaapProducerHttpClient producerClientUnderTestSpy;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
@@ -88,7 +89,7 @@ class DmaapProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
- producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
+ producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock));
clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
clientMock = mock(CloseableHttpAsyncClient.class);
@@ -110,6 +111,7 @@ class DmaapProducerReactiveHttpClientTest {
verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).setDefaultRequestConfig(any());
verify(clientBuilderMock).build();
verifyNoMoreInteractions(clientBuilderMock);
@@ -138,9 +140,9 @@ class DmaapProducerReactiveHttpClientTest {
verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
RequestConfig requestConfig = requestConfigCaptor.getValue();
- assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
- assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
- assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getConnectionRequestTimeout());
verify(clientBuilderMock).build();
verifyNoMoreInteractions(clientBuilderMock);