aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java9
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java29
2 files changed, 28 insertions, 10 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index e99b8114..21266fbc 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.service;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.SERVICE_NAME;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.Builder;
-
import reactor.core.publisher.Mono;
/**
@@ -68,17 +69,21 @@ public class DmaapReactiveWebClient {
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
return Mono.just(clientResponse);
});
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
.forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
return Mono.just(clientRequest);
});
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 4869e4c2..f80fcd0f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,9 +16,11 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -28,10 +30,10 @@ import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Future;
-
import javax.net.ssl.SSLContext;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
@@ -45,14 +47,17 @@ import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
import reactor.core.publisher.Mono;
/**
@@ -66,6 +71,8 @@ public class DmaapProducerReactiveHttpClient {
private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
private static final String URI_SEPARATOR = "/";
private static final String DEFAULT_FEED_ID = "1";
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -101,11 +108,11 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
+ Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
-
webClient = getWebClient();
webClient.start();
@@ -114,9 +121,10 @@ public class DmaapProducerReactiveHttpClient {
prepareBody(consumerDmaapModel, put);
addUserCredentialsToHead(put);
+ logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
Future<HttpResponse> future = webClient.execute(put, null);
HttpResponse response = future.get();
- logger.trace("{}", response);
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
webClient.close();
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -141,6 +149,11 @@ public class DmaapProducerReactiveHttpClient {
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
+
+ String requestID = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestID);
+ String invocationID = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationID);
}
private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {