aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java
diff options
context:
space:
mode:
authorMarcin Migdal <marcin.migdal@nokia.com>2019-02-27 17:10:03 +0100
committerMarcin Migdal <marcin.migdal@nokia.com>2019-02-28 15:02:35 +0100
commit6493e10bc206313bd92a6b4f760b0a7e3d6ee641 (patch)
tree87bc12e63eeb9301850d8e443be05ed02f2d6679 /rest-services/dmaap-client/src/main/java
parent5e6c996472969e50b3da60a06559c3231218637e (diff)
Remove Spring stuff from DMaaP
Change-Id: Id441d0dce89fcb24b5558af7bdf996a74eba78e6 Issue-ID: DCAEGEN2-1245 Signed-off-by: Marcin Migdal <marcin.migdal@nokia.com>
Diffstat (limited to 'rest-services/dmaap-client/src/main/java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java64
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java55
2 files changed, 35 insertions, 84 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
index d92aef9c..99f70209 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -20,19 +20,16 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
-import java.util.function.Consumer;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
-import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -41,6 +38,8 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPConsumerReactiveHttpClient {
+ private final static String SLASH = "/";
+ private final static String CONTENT_TYPE = "Content-Type";
private final String dmaapHostName;
private final String dmaapProtocol;
private final Integer dmaapPortNumber;
@@ -48,15 +47,16 @@ public class DMaaPConsumerReactiveHttpClient {
private final String consumerGroup;
private final String consumerId;
private final String contentType;
- private final WebClient webClient;
- private final static String SLASH="/";
+ private final CloudHttpClient cloudHttpClient;
/**
* Constructor of DMaaPConsumerReactiveHttpClient.
*
* @param consumerConfiguration - DMaaP consumer configuration object
*/
- public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration, WebClient webClient) {
+
+ public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration,
+ CloudHttpClient cloudHttpClient) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
@@ -64,7 +64,7 @@ public class DMaaPConsumerReactiveHttpClient {
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
this.contentType = consumerConfiguration.dmaapContentType();
- this.webClient = webClient;
+ this.cloudHttpClient = cloudHttpClient;
}
/**
@@ -72,30 +72,15 @@ public class DMaaPConsumerReactiveHttpClient {
*
* @return reactive response from DMaaP in string format
*/
- public Mono<String> getDMaaPConsumerResponse() {
- return webClient
- .get()
- .uri(getUri())
- .headers(getHeaders())
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
- .bodyToMono(String.class);
- }
-
- private Consumer<HttpHeaders> getHeaders() {
- return httpHeaders -> {
- httpHeaders.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
- httpHeaders.set(X_INVOCATION_ID, UUID.randomUUID().toString());
- httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
- };
- }
-
- private String createRequestPath() {
- return new StringBuilder().append(SLASH).append(dmaapTopicName).append(SLASH).append(consumerGroup).append(SLASH).append(consumerId).toString();
+ public Mono<String> getDMaaPConsumerResponse(Optional<RequestDiagnosticContext> requestDiagnosticContextOptional) {
+ Map<String,String> headers = new HashMap<>();
+ headers.put(CONTENT_TYPE,contentType);
+ if (requestDiagnosticContextOptional.isPresent()) {
+ return cloudHttpClient.get(getUri().toString(), requestDiagnosticContextOptional.get(),headers, String.class);
+ }
+ RequestDiagnosticContext requestDiagnosticContext = ImmutableRequestDiagnosticContext.builder()
+ .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build();
+ return cloudHttpClient.get(getUri().toString(), requestDiagnosticContext, headers, String.class);
}
URI getUri() {
@@ -103,4 +88,9 @@ public class DMaaPConsumerReactiveHttpClient {
new URIBuilder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber).path(createRequestPath())
.build().toString());
}
+
+ private String createRequestPath() {
+ return new StringBuilder().append(SLASH).append(dmaapTopicName).append(SLASH).append(consumerGroup)
+ .append(SLASH).append(consumerId).toString();
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
index b1f2ab02..fba6f188 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
@@ -20,30 +20,17 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-
import io.netty.handler.ssl.SslContext;
import javax.net.ssl.SSLException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.http.client.reactive.ClientHttpConnector;
-import org.springframework.http.client.reactive.ReactorClientHttpConnector;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClient;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
public class DMaaPReactiveWebClientFactory {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final SslFactory sslFactory;
public DMaaPReactiveWebClientFactory() {
@@ -55,49 +42,23 @@ public class DMaaPReactiveWebClientFactory {
}
/**
- * Construct Reactive WebClient with appropriate settings.
+ * Construct CloudHttpClient with appropriate settings.
*
- * @return WebClient
+ * @return CloudHttpClient
*/
- public WebClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+
+ public CloudHttpClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
SslContext sslContext = createSslContext(consumerConfiguration);
- ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
- HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
- return WebClient.builder()
- .clientConnector(reactorClientHttpConnector)
- .filter(logRequest())
- .filter(logResponse())
- .build();
+ return new CloudHttpClient(sslContext);
}
private SslContext createSslContext(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
if (consumerConfiguration.enableDmaapCertAuth()) {
return sslFactory.createSecureContext(
- consumerConfiguration.keyStorePath(), consumerConfiguration.keyStorePasswordPath(),
- consumerConfiguration.trustStorePath(), consumerConfiguration.trustStorePasswordPath()
+ consumerConfiguration.keyStorePath(), consumerConfiguration.keyStorePasswordPath(),
+ consumerConfiguration.trustStorePath(), consumerConfiguration.trustStorePasswordPath()
);
}
return sslFactory.createInsecureContext();
}
-
- private ExchangeFilterFunction logResponse() {
- return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
- MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
- logger.info("Response Status {}", clientResponse.statusCode());
- MDC.remove(RESPONSE_CODE);
- return Mono.just(clientResponse);
- });
- }
-
- private ExchangeFilterFunction logRequest() {
- return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
- MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
- logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
- clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
- MDC.remove(SERVICE_NAME);
- return Mono.just(clientRequest);
- });
- }
-
}