diff options
Diffstat (limited to 'rest-services/dmaap-client')
8 files changed, 231 insertions, 19 deletions
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index 54b9a37e..813e44dd 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -7,7 +7,7 @@ <parent> <groupId>org.onap.dcaegen2.services.sdk</groupId> <artifactId>dcaegen2-services-sdk-rest-services</artifactId> - <version>1.7.0-SNAPSHOT</version> + <version>1.8.0-SNAPSHOT</version> </parent> <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java index 5211807a..2bb04df4 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; +import com.google.common.primitives.Bytes; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.control.Option; +import org.apache.commons.lang3.ArrayUtils; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 @@ -35,4 +45,20 @@ final class Commons { return String.format("%d %s%n%s", httpResponse.statusCode(), httpResponse.statusReason(), httpResponse.bodyAsString()); } + + static Tuple2<String, String> basicAuthHeader(AafCredentials credentials) { + Charset utf8 = StandardCharsets.UTF_8; + byte[] username = toBytes(credentials.username(), utf8); + byte[] separator = ":".getBytes(utf8); + byte[] password = toBytes(credentials.password(), utf8); + byte[] combined = ArrayUtils.addAll(Bytes.concat(username, separator, password)); + String userCredentials = Base64.getEncoder().encodeToString(combined); + return Tuple.of("Authorization", "Basic " + userCredentials); + } + + private static byte[] toBytes(String text, Charset charset) { + return Option.of(text) + .map(s -> s.getBytes(charset)) + .getOrElse(new byte[0]); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 5f72808e..6e4679c3 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -25,6 +25,7 @@ import com.google.gson.JsonElement; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; @@ -43,6 +44,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,16 +112,14 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { - ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() + return ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) - .body(body); - - return Option.of(request.timeoutConfig()) - .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) - .getOrElse(requestBuilder::build); + .customHeaders(headers(request)) + .body(body) + .timeout(timeout(request).getOrNull()) + .build(); } private MessageRouterPublishResponse buildResponse( @@ -138,4 +138,17 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .failReason(failReason) .build()); } + + private Option<Duration> timeout(MessageRouterPublishRequest request) { + return Option.of(request.timeoutConfig()) + .map(DmaapTimeoutConfig::getTimeout); + } + + private Map<String, String> headers(MessageRouterPublishRequest request) { + Map<String, String> headers = Option.of(request.sinkDefinition().aafCredentials()) + .map(Commons::basicAuthHeader) + .map(HashMap::of) + .getOrElse(HashMap.empty()); + return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString()); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index acb297ab..d98e8d3a 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -25,7 +25,9 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashMap; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -41,12 +43,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.net.ConnectException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; @@ -81,14 +85,13 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { } private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { - ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() + return ImmutableHttpRequest.builder() .method(HttpMethod.GET) .url(buildSubscribeUrl(request)) - .diagnosticContext(request.diagnosticContext().withNewInvocationId()); - - return Option.of(request.timeoutConfig()) - .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) - .getOrElse(requestBuilder::build); + .diagnosticContext(request.diagnosticContext().withNewInvocationId()) + .customHeaders(headers(request)) + .timeout(timeout(request).getOrNull()) + .build(); } private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) { @@ -117,4 +120,16 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .failReason(failReason) .build()); } + + private Option<Duration> timeout(MessageRouterSubscribeRequest request) { + return Option.of(request.timeoutConfig()) + .map(DmaapTimeoutConfig::getTimeout); + } + + private Map<String, String> headers(MessageRouterSubscribeRequest request) { + return Option.of(request.sourceDefinition().aafCredentials()) + .map(Commons::basicAuthHeader) + .map(HashMap::of) + .getOrElse(HashMap.empty()); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index 1a315806..ebb9af74 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -26,6 +26,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; @@ -62,6 +63,17 @@ public final class MessageRouterTestsUtils { .build(); } + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, String username, String password) { + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl) + .withAafCredentials(ImmutableAafCredentials.builder() + .username(username) + .password(password) + .build())) + .contentType(ContentType.APPLICATION_JSON) + .build(); + } + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType) { return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(createMessageRouterSink(topicUrl)) @@ -83,7 +95,6 @@ public final class MessageRouterTestsUtils { public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, String consumerGroup, String consumerId, Duration timeout) { - return ImmutableMessageRouterSubscribeRequest .builder() .timeoutConfig(ImmutableDmaapTimeoutConfig.builder() @@ -95,6 +106,21 @@ public final class MessageRouterTestsUtils { .build(); } + public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId, + String username, String password) { + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(getImmutableMessageRouterSource(topicUrl) + .withAafCredentials(ImmutableAafCredentials.builder() + .username(username) + .password(password) + .build())) + .consumerGroup(consumerGroup) + .consumerId(consumerId) + .build(); + } + private static ImmutableMessageRouterSource getImmutableMessageRouterSource(String topicUrl) { return ImmutableMessageRouterSource.builder() .name("the topic") diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index ffd301c3..70adf59d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -477,6 +477,7 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Test void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() { //given @@ -541,6 +542,34 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2)); } + @Test + void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() { + //given + final String topic = "TOPIC17"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, "username","password"); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path) + .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); + } + private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() @@ -573,4 +602,4 @@ class MessageRouterPublisherIT { .build()) .build(); } -}
\ No newline at end of file +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 8f4edab0..3d43e817 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -131,7 +131,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriberShouldHandleSingleItemResponse() { + void subscriber_shouldHandleSingleItemResponse() { //given final String topic = "TOPIC"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -415,10 +415,11 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Test void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() { //given - final String topic = "TOPIC4"; + final String topic = "TOPIC10"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, @@ -444,6 +445,40 @@ class MessageRouterSubscriberIT { .verify(TIMEOUT); } + @Test + void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() { + //given + final String topic = "TOPIC11"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, "username", "password"); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path) + .withHeader("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); + } + private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterSubscriberConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java new file mode 100644 index 00000000..72c35925 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; + +import io.vavr.Tuple; +import io.vavr.Tuple2; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; + +import static org.assertj.core.api.Assertions.assertThat; + +class CommonsTest { + + @Test + void shouldCreateBasicAuthHeader() { + // given + AafCredentials credentials = create("username", "password"); + + // when + Tuple2<String, String> basicAuthHeader = Commons.basicAuthHeader(credentials); + + // then + verifyBasicAuthHeader(basicAuthHeader, "dXNlcm5hbWU6cGFzc3dvcmQ="); + } + + @Test + void shouldCreateBasicAuthHeaderForEmpties() { + // given + AafCredentials credentials = create("", ""); + + // when + Tuple2<String, String> basicAuthHeader = Commons.basicAuthHeader(credentials); + + // then + verifyBasicAuthHeader(basicAuthHeader, "Og=="); + } + + private AafCredentials create(String username, String password) { + return ImmutableAafCredentials.builder() + .username(username) + .password(password) + .build(); + } + + private void verifyBasicAuthHeader(Tuple2<String, String> basicAuthHeader, String encoded) { + Tuple2<String, String> expected = Tuple.of("Authorization", "Basic " + encoded); + assertThat(basicAuthHeader).isEqualTo(expected); + } +} |