aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2021-03-11 11:51:30 +0100
committertkogut <tomasz.kogut@nokia.com>2021-03-16 09:35:35 +0100
commit4acad60f6fa6ab803edd22d7fa8e55fda40fbb41 (patch)
treebe5a43a5549c58b73ed11d4fafa3bb923e8764b0 /rest-services/dmaap-client/src/main/java/org
parentea4a776bfd70f3a01ea01583f1c5af70a98576de (diff)
Support authorized topics in DMaaP-Client (HTTP Basic-Auth)
Issue-ID: DCAEGEN2-2670 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: I9f12a16df57c2ddbec457ff017e148f7c19ed20d
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java28
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java27
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java27
3 files changed, 68 insertions, 14 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
index 5211807a..2bb04df4 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +20,18 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
+import com.google.common.primitives.Bytes;
+import io.vavr.Tuple;
+import io.vavr.Tuple2;
+import io.vavr.control.Option;
+import org.apache.commons.lang3.ArrayUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since April 2019
@@ -35,4 +45,20 @@ final class Commons {
return String.format("%d %s%n%s", httpResponse.statusCode(), httpResponse.statusReason(),
httpResponse.bodyAsString());
}
+
+ static Tuple2<String, String> basicAuthHeader(AafCredentials credentials) {
+ Charset utf8 = StandardCharsets.UTF_8;
+ byte[] username = toBytes(credentials.username(), utf8);
+ byte[] separator = ":".getBytes(utf8);
+ byte[] password = toBytes(credentials.password(), utf8);
+ byte[] combined = ArrayUtils.addAll(Bytes.concat(username, separator, password));
+ String userCredentials = Base64.getEncoder().encodeToString(combined);
+ return Tuple.of("Authorization", "Basic " + userCredentials);
+ }
+
+ private static byte[] toBytes(String text, Charset charset) {
+ return Option.of(text)
+ .map(s -> s.getBytes(charset))
+ .getOrElse(new byte[0]);
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 5f72808e..6e4679c3 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonElement;
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
+import io.vavr.collection.Map;
import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -43,6 +44,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,16 +112,14 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
}
private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
- ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
+ return ImmutableHttpRequest.builder()
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
.diagnosticContext(request.diagnosticContext().withNewInvocationId())
- .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
- .body(body);
-
- return Option.of(request.timeoutConfig())
- .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
- .getOrElse(requestBuilder::build);
+ .customHeaders(headers(request))
+ .body(body)
+ .timeout(timeout(request).getOrNull())
+ .build();
}
private MessageRouterPublishResponse buildResponse(
@@ -138,4 +138,17 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
.failReason(failReason)
.build());
}
+
+ private Option<Duration> timeout(MessageRouterPublishRequest request) {
+ return Option.of(request.timeoutConfig())
+ .map(DmaapTimeoutConfig::getTimeout);
+ }
+
+ private Map<String, String> headers(MessageRouterPublishRequest request) {
+ Map<String, String> headers = Option.of(request.sinkDefinition().aafCredentials())
+ .map(Commons::basicAuthHeader)
+ .map(HashMap::of)
+ .getOrElse(HashMap.empty());
+ return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString());
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index acb297ab..d98e8d3a 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -25,7 +25,9 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashMap;
import io.vavr.collection.List;
+import io.vavr.collection.Map;
import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -41,12 +43,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
@@ -81,14 +85,13 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
}
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
- ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
+ return ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
.url(buildSubscribeUrl(request))
- .diagnosticContext(request.diagnosticContext().withNewInvocationId());
-
- return Option.of(request.timeoutConfig())
- .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
- .getOrElse(requestBuilder::build);
+ .diagnosticContext(request.diagnosticContext().withNewInvocationId())
+ .customHeaders(headers(request))
+ .timeout(timeout(request).getOrNull())
+ .build();
}
private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
@@ -117,4 +120,16 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
.failReason(failReason)
.build());
}
+
+ private Option<Duration> timeout(MessageRouterSubscribeRequest request) {
+ return Option.of(request.timeoutConfig())
+ .map(DmaapTimeoutConfig::getTimeout);
+ }
+
+ private Map<String, String> headers(MessageRouterSubscribeRequest request) {
+ return Option.of(request.sourceDefinition().aafCredentials())
+ .map(Commons::basicAuthHeader)
+ .map(HashMap::of)
+ .getOrElse(HashMap.empty());
+ }
}