diff options
4 files changed, 111 insertions, 23 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index f6ef94b7..159fc598 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockserver.client.MockServerClient; -import org.mockserver.matchers.TimeToLive; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; @@ -352,7 +351,9 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT .when(request().withPath(path), Times.once()) .respond(response().withStatusCode(404)); - final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher( + retryConfig(1, 1)); //when final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); @@ -381,8 +382,9 @@ class MessageRouterPublisherIT { final String path = String.format("/events/%s", topic); MOCK_SERVER_CLIENT .when(request().withPath(path), Times.once()) - .respond(response().withDelay(TimeUnit.SECONDS, 10)); - final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher( + retryConfig(1, 1)); //when final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); @@ -396,11 +398,51 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - private MessageRouterPublisherConfig retryConfig() { + @Test + void publisher_shouldRetryManyTimesAndSuccessfullyPublish() { + final String topic = "TOPIC13"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages); + final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5)); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); + } + + private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() - .retryIntervalInSeconds(1) - .retryCount(1) + .retryIntervalInSeconds(retryInterval) + .retryCount(retryCount) .build()) .build(); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 1f4e499d..15c3bd8e 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -245,7 +245,7 @@ class MessageRouterSubscriberIT { final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); MOCK_SERVER_CLIENT .when(request().withPath(path), Times.once()) - .respond(response().withDelay(TimeUnit.SECONDS, 5)); + .respond(response().withDelay(TimeUnit.SECONDS, 2)); //when Mono<MessageRouterSubscribeResponse> response = subscriber @@ -278,7 +278,8 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT .when(request().withPath(path), Times.once()) .respond(response().withStatusCode(404)); - final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber( + retryConfig(1, 1)); //when registerTopic(publisher, createPublishRequest(topicUrl), subscriber, @@ -314,8 +315,9 @@ class MessageRouterSubscriberIT { final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); MOCK_SERVER_CLIENT .when(request().withPath(path), Times.once()) - .respond(response().withDelay(TimeUnit.SECONDS, 10)); - final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber( + retryConfig(1, 1)); //when registerTopic(publisher, createPublishRequest(topicUrl), subscriber, @@ -333,11 +335,58 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - private MessageRouterSubscriberConfig retryConfig() { + @Test + void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC8"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber( + retryConfig(1, 5)); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); + } + + private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterSubscriberConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() - .retryIntervalInSeconds(1) - .retryCount(1) + .retryIntervalInSeconds(retryInterval) + .retryCount(retryCount) .build()) .build(); } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java index d427ee5e..46f9431b 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.ByteBufFlux; +import reactor.netty.ByteBufMono; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -58,12 +59,9 @@ public interface RequestBody { } static RequestBody fromString(String contents, Charset charset) { - ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer(); - encodedContents.writeCharSequence(contents, charset); - return ImmutableRequestBody.builder() - .length(encodedContents.readableBytes()) - .contents(Mono.just(encodedContents.retain())) + .length(contents.length()) + .contents(ByteBufMono.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT)) .build(); } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index d0bdf414..76bde27e 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -28,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryCo import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; @@ -113,7 +112,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED)) .request(request.method().asNetty()) - .send(Flux.from(request.body().contents())) + .send(request.body().contents()) .uri(request.url()); } @@ -121,7 +120,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(Flux.from(request.body().contents())) + .send(request.body().contents()) .uri(request.url()); } |