diff options
author | Pawel <pawel.kasperkiewicz@nokia.com> | 2021-01-05 13:40:47 +0100 |
---|---|---|
committer | Pawel <pawel.kasperkiewicz@nokia.com> | 2021-01-07 15:43:58 +0100 |
commit | f0c78ff2bbb5dd515cc4999ad639b8d5b15a84eb (patch) | |
tree | 703db28f6b8947e05d0babf6c05d5ac4bde3d716 | |
parent | 91d17acfab525c96aee50ad14191c78f7e833376 (diff) |
Add timeout for Subscriber(dmaap-client)
Issue-ID: DCAEGEN2-1483
Signed-off-by: Pawel <pawel.kasperkiewicz@nokia.com>
Change-Id: Ib733af0541a1aad84691a2db97c1e495f0162866
6 files changed, 141 insertions, 26 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index 72c0bad3..f7ccf4f2 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,11 @@ import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.List; import java.nio.charset.StandardCharsets; + +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -35,6 +38,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; @@ -59,16 +65,22 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { @Override public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) { LOGGER.debug("Requesting new items from DMaaP MR: {}", request); - return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse); + return httpClient.call(buildGetHttpRequest(request)) + .map(this::buildGetResponse) + .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); } private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { - return ImmutableHttpRequest.builder() + ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() .method(HttpMethod.GET) .url(buildSubscribeUrl(request)) - .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .build(); + .diagnosticContext(request.diagnosticContext().withNewInvocationId()); + + return Option.of(request.timeoutConfig()) + .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) + .getOrElse(requestBuilder::build); } private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) { @@ -90,4 +102,11 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(), request.consumerId()); } + + private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .failReason(failReason) + .build()); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java index 212d8f2a..95c5e7d1 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** @@ -32,4 +34,6 @@ public interface DmaapRequest { default RequestDiagnosticContext diagnosticContext() { return RequestDiagnosticContext.create(); } + + @Nullable TimeoutConfig timeoutConfig(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 4490c79f..ccd516d9 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; -import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -35,8 +33,6 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); - @Nullable TimeoutConfig timeoutConfig(); - @Value.Default default ContentType contentType() { return ContentType.APPLICATION_JSON; diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index d94639a5..2b8027c1 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,19 +71,37 @@ public final class MessageRouterTestsUtils { public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, String consumerGroup, String consumerId) { - ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() - .name("the topic") - .topicUrl(topicUrl) + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(getImmutableMessageRouterSource(topicUrl)) + .consumerGroup(consumerGroup) + .consumerId(consumerId) .build(); + } + + public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId, + Duration timeout) { return ImmutableMessageRouterSubscribeRequest .builder() - .sourceDefinition(sourceDefinition) + .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeout(timeout) + .build()) + .sourceDefinition(getImmutableMessageRouterSource(topicUrl)) .consumerGroup(consumerGroup) .consumerId(consumerId) .build(); } + private static ImmutableMessageRouterSource getImmutableMessageRouterSource(String topicUrl) { + return ImmutableMessageRouterSource.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + public static List<JsonElement> getAsJsonElements(List<String> messages) { return messages.map(JsonParser::parseString); } @@ -109,9 +127,10 @@ public final class MessageRouterTestsUtils { } public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) { + String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs); return ImmutableMessageRouterSubscribeResponse .builder() - .failReason(String.format(failReasonFormat, formatArgs)) + .failReason(failReason) .build(); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 7a31209c..bd161aab 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -37,8 +39,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.io.IOException; import java.time.Duration; +import java.util.concurrent.TimeUnit; +import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse; @@ -48,11 +53,18 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; + @Testcontainers class MessageRouterSubscriberIT { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; + private static String PROXY_EVENTS_PATH; + private static Proxy DMAAP_PROXY; private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + "{" + "\"mrstatus\":3001," + @@ -60,6 +72,18 @@ class MessageRouterSubscriberIT { "\"message\":\"No such topic exists.-[%s]\"," + "\"status\":404" + "}"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + + "{" + + "\"requestError\":" + + "{" + + "\"serviceException\":" + + "{" + + "\"messageId\":\"SVC0001\"," + + "\"text\":\"Client timeout exception occurred, Error code is %1\"," + + "\"variables\":[\"408\"]" + + "}" + + "}" + + "}"; @Container private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); @@ -73,14 +97,16 @@ class MessageRouterSubscriberIT { @BeforeAll - static void setUp() { - EVENTS_PATH = String.format("http://%s:%d/events", - CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME, - DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT), - CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME, - DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT)); + static void setUp() throws IOException { + EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); + PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); + + DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", + String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), + String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); } + @Test void subscriber_shouldHandleNoSuchTopicException() { //given @@ -207,6 +233,30 @@ class MessageRouterSubscriberIT { .verify(TIMEOUT); } + @Test + void subscriber_shouldHandleTimeoutException() throws IOException { + //given + final String topic = "newTopic"; + final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest( + String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( + TIMEOUT_ERROR_MESSAGE); + final String toxic = "latency-toxic"; + DMAAP_PROXY.toxics() + .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); + //when + Mono<MessageRouterSubscribeResponse> response = subscriber + .get(mrSubscribeRequest); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + + //cleanup + DMAAP_PROXY.toxics().get(toxic).remove(); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index ef2cb5ef..1f97001e 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; +import io.netty.handler.timeout.ReadTimeoutException; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; @@ -48,7 +49,8 @@ class MessageRouterSubscriberImplTest { private final RxHttpClient httpClient = mock(RxHttpClient.class); private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault(); - private final MessageRouterSubscriber cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance()); + private final MessageRouterSubscriber + cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance()); private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() @@ -129,4 +131,29 @@ class MessageRouterSubscriberImplTest { // then assertThatExceptionOfType(JsonSyntaxException.class).isThrownBy(() -> cut.get(mrRequest).block()); } + + @Test + void getWithProperRequest_shouldReturnTimeoutError() { + + // given + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + + // when + final Mono<MessageRouterSubscribeResponse> responses = cut + .get(mrRequest); + final MessageRouterSubscribeResponse response = responses.block(); + + // then + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).contains("408 Request Timeout"); + assertThat(response.hasElements()).isFalse(); + + + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET); + assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(), + mrRequest.consumerGroup(), mrRequest.consumerId())); + assertThat(httpRequest.body()).isNull(); + } }
\ No newline at end of file |