aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java56
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java63
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java10
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java5
4 files changed, 111 insertions, 23 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index f6ef94b7..159fc598 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.client.MockServerClient;
-import org.mockserver.matchers.TimeToLive;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
@@ -352,7 +351,9 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -381,8 +382,9 @@ class MessageRouterPublisherIT {
final String path = String.format("/events/%s", topic);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -396,11 +398,51 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterPublisherConfig retryConfig() {
+ @Test
+ void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
+ final String topic = "TOPIC13";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 1f4e499d..15c3bd8e 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -245,7 +245,7 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 5));
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
//when
Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -278,7 +278,8 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -314,8 +315,9 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -333,11 +335,58 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterSubscriberConfig retryConfig() {
+ @Test
+ void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC8";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 5));
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java
index d427ee5e..46f9431b 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
+import reactor.netty.ByteBufMono;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -58,12 +59,9 @@ public interface RequestBody {
}
static RequestBody fromString(String contents, Charset charset) {
- ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer();
- encodedContents.writeCharSequence(contents, charset);
-
return ImmutableRequestBody.builder()
- .length(encodedContents.readableBytes())
- .contents(Mono.just(encodedContents.retain()))
+ .length(contents.length())
+ .contents(ByteBufMono.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT))
.build();
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index d0bdf414..76bde27e 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -28,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryCo
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
@@ -113,7 +112,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
.request(request.method().asNetty())
- .send(Flux.from(request.body().contents()))
+ .send(request.body().contents())
.uri(request.url());
}
@@ -121,7 +120,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
.request(request.method().asNetty())
- .send(Flux.from(request.body().contents()))
+ .send(request.body().contents())
.uri(request.url());
}