diff options
author | tkogut <tomasz.kogut@nokia.com> | 2021-03-11 11:51:30 +0100 |
---|---|---|
committer | tkogut <tomasz.kogut@nokia.com> | 2021-03-16 09:35:35 +0100 |
commit | 4acad60f6fa6ab803edd22d7fa8e55fda40fbb41 (patch) | |
tree | be5a43a5549c58b73ed11d4fafa3bb923e8764b0 /rest-services/dmaap-client/src/main | |
parent | ea4a776bfd70f3a01ea01583f1c5af70a98576de (diff) |
Support authorized topics in DMaaP-Client (HTTP Basic-Auth)
Issue-ID: DCAEGEN2-2670
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: I9f12a16df57c2ddbec457ff017e148f7c19ed20d
Diffstat (limited to 'rest-services/dmaap-client/src/main')
3 files changed, 68 insertions, 14 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java index 5211807a..2bb04df4 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; +import com.google.common.primitives.Bytes; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.control.Option; +import org.apache.commons.lang3.ArrayUtils; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 @@ -35,4 +45,20 @@ final class Commons { return String.format("%d %s%n%s", httpResponse.statusCode(), httpResponse.statusReason(), httpResponse.bodyAsString()); } + + static Tuple2<String, String> basicAuthHeader(AafCredentials credentials) { + Charset utf8 = StandardCharsets.UTF_8; + byte[] username = toBytes(credentials.username(), utf8); + byte[] separator = ":".getBytes(utf8); + byte[] password = toBytes(credentials.password(), utf8); + byte[] combined = ArrayUtils.addAll(Bytes.concat(username, separator, password)); + String userCredentials = Base64.getEncoder().encodeToString(combined); + return Tuple.of("Authorization", "Basic " + userCredentials); + } + + private static byte[] toBytes(String text, Charset charset) { + return Option.of(text) + .map(s -> s.getBytes(charset)) + .getOrElse(new byte[0]); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 5f72808e..6e4679c3 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -25,6 +25,7 @@ import com.google.gson.JsonElement; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; @@ -43,6 +44,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,16 +112,14 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { - ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() + return ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) - .body(body); - - return Option.of(request.timeoutConfig()) - .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) - .getOrElse(requestBuilder::build); + .customHeaders(headers(request)) + .body(body) + .timeout(timeout(request).getOrNull()) + .build(); } private MessageRouterPublishResponse buildResponse( @@ -138,4 +138,17 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .failReason(failReason) .build()); } + + private Option<Duration> timeout(MessageRouterPublishRequest request) { + return Option.of(request.timeoutConfig()) + .map(DmaapTimeoutConfig::getTimeout); + } + + private Map<String, String> headers(MessageRouterPublishRequest request) { + Map<String, String> headers = Option.of(request.sinkDefinition().aafCredentials()) + .map(Commons::basicAuthHeader) + .map(HashMap::of) + .getOrElse(HashMap.empty()); + return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString()); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index acb297ab..d98e8d3a 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -25,7 +25,9 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashMap; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -41,12 +43,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.net.ConnectException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; @@ -81,14 +85,13 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { } private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { - ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() + return ImmutableHttpRequest.builder() .method(HttpMethod.GET) .url(buildSubscribeUrl(request)) - .diagnosticContext(request.diagnosticContext().withNewInvocationId()); - - return Option.of(request.timeoutConfig()) - .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) - .getOrElse(requestBuilder::build); + .diagnosticContext(request.diagnosticContext().withNewInvocationId()) + .customHeaders(headers(request)) + .timeout(timeout(request).getOrNull()) + .build(); } private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) { @@ -117,4 +120,16 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .failReason(failReason) .build()); } + + private Option<Duration> timeout(MessageRouterSubscribeRequest request) { + return Option.of(request.timeoutConfig()) + .map(DmaapTimeoutConfig::getTimeout); + } + + private Map<String, String> headers(MessageRouterSubscribeRequest request) { + return Option.of(request.sourceDefinition().aafCredentials()) + .map(Commons::basicAuthHeader) + .map(HashMap::of) + .getOrElse(HashMap.empty()); + } } |