diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java new file mode 100644 index 00000000..5f5ccddf --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME; +import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.Builder; + +import reactor.core.publisher.Mono; + +/** + * Web client for the DMaaP MessageRouter. + * + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 + */ +public class DmaapWebClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String contentType; + private String dmaapUserName; + private String dmaapUserPassword; + + /** + * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. + * + * @param dmaapCustomConfig - configuration object + * @return DmaapReactiveWebClient + */ + public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { + this.contentType = dmaapCustomConfig.dmaapContentType(); + return this; + } + + /** + * Construct Reactive WebClient with appropriate settings. + * + * @return WebClient + */ + public WebClient build() { + Builder webClientBuilder = WebClient.builder() // + .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) // + .filter(logRequest()) // + .filter(logResponse()); + if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null + && !dmaapUserPassword.isEmpty()) { + webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword)); + } + return webClientBuilder.build(); + } + + private ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); + logger.trace("Response Status {}", clientResponse.statusCode()); + MDC.remove(RESPONSE_CODE); + return Mono.just(clientResponse); + }); + } + + private ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); + logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value))); + logger.trace("HTTP request headers: {}", clientRequest.headers()); + MDC.remove(SERVICE_NAME); + return Mono.just(clientRequest); + }); + } +} |