aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPawel <pawel.kasperkiewicz@nokia.com>2021-01-05 13:40:47 +0100
committerPawel <pawel.kasperkiewicz@nokia.com>2021-01-07 15:43:58 +0100
commitf0c78ff2bbb5dd515cc4999ad639b8d5b15a84eb (patch)
tree703db28f6b8947e05d0babf6c05d5ac4bde3d716
parent91d17acfab525c96aee50ad14191c78f7e833376 (diff)
Add timeout for Subscriber(dmaap-client)
Issue-ID: DCAEGEN2-1483 Signed-off-by: Pawel <pawel.kasperkiewicz@nokia.com> Change-Id: Ib733af0541a1aad84691a2db97c1e495f0162866
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java29
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java6
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java6
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java31
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java64
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java31
6 files changed, 141 insertions, 26 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index 72c0bad3..f7ccf4f2 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,8 +26,11 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
+
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -35,6 +38,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
@@ -59,16 +65,22 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
@Override
public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
- return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
+ return httpClient.call(buildGetHttpRequest(request))
+ .map(this::buildGetResponse)
+ .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
}
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
- return ImmutableHttpRequest.builder()
+ ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
.url(buildSubscribeUrl(request))
- .diagnosticContext(request.diagnosticContext().withNewInvocationId())
- .build();
+ .diagnosticContext(request.diagnosticContext().withNewInvocationId());
+
+ return Option.of(request.timeoutConfig())
+ .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+ .getOrElse(requestBuilder::build);
}
private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
@@ -90,4 +102,11 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
request.consumerId());
}
+
+ private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason(failReason)
+ .build());
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
index 212d8f2a..95c5e7d1 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
/**
@@ -32,4 +34,6 @@ public interface DmaapRequest {
default RequestDiagnosticContext diagnosticContext() {
return RequestDiagnosticContext.create();
}
+
+ @Nullable TimeoutConfig timeoutConfig();
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
index 4490c79f..ccd516d9 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +21,8 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -35,8 +33,6 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
MessageRouterSink sinkDefinition();
- @Nullable TimeoutConfig timeoutConfig();
-
@Value.Default
default ContentType contentType() {
return ContentType.APPLICATION_JSON;
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
index d94639a5..2b8027c1 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -71,19 +71,37 @@ public final class MessageRouterTestsUtils {
public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
String consumerGroup, String consumerId) {
- ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(topicUrl)
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
.build();
+ }
+
+ public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId,
+ Duration timeout) {
return ImmutableMessageRouterSubscribeRequest
.builder()
- .sourceDefinition(sourceDefinition)
+ .timeoutConfig(ImmutableTimeoutConfig.builder()
+ .timeout(timeout)
+ .build())
+ .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
.consumerGroup(consumerGroup)
.consumerId(consumerId)
.build();
}
+ private static ImmutableMessageRouterSource getImmutableMessageRouterSource(String topicUrl) {
+ return ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
+
public static List<JsonElement> getAsJsonElements(List<String> messages) {
return messages.map(JsonParser::parseString);
}
@@ -109,9 +127,10 @@ public final class MessageRouterTestsUtils {
}
public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) {
+ String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
return ImmutableMessageRouterSubscribeResponse
.builder()
- .failReason(String.format(failReasonFormat, formatArgs))
+ .failReason(failReason)
.build();
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 7a31209c..bd161aab 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.vavr.collection.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -37,8 +39,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.io.IOException;
import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
@@ -48,11 +53,18 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+
@Testcontainers
class MessageRouterSubscriberIT {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String CONSUMER_GROUP = "group1";
private static final String CONSUMER_ID = "consumer200";
+ private static String PROXY_EVENTS_PATH;
+ private static Proxy DMAAP_PROXY;
private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
"{" +
"\"mrstatus\":3001," +
@@ -60,6 +72,18 @@ class MessageRouterSubscriberIT {
"\"message\":\"No such topic exists.-[%s]\"," +
"\"status\":404" +
"}";
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ + "{"
+ + "\"requestError\":"
+ + "{"
+ + "\"serviceException\":"
+ + "{"
+ + "\"messageId\":\"SVC0001\","
+ + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+ + "\"variables\":[\"408\"]"
+ + "}"
+ + "}"
+ + "}";
@Container
private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
@@ -73,14 +97,16 @@ class MessageRouterSubscriberIT {
@BeforeAll
- static void setUp() {
- EVENTS_PATH = String.format("http://%s:%d/events",
- CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
- DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
- CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
- DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
+ static void setUp() throws IOException {
+ EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
+ PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+
+ DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
+ String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
+ String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
}
+
@Test
void subscriber_shouldHandleNoSuchTopicException() {
//given
@@ -207,6 +233,30 @@ class MessageRouterSubscriberIT {
.verify(TIMEOUT);
}
+ @Test
+ void subscriber_shouldHandleTimeoutException() throws IOException {
+ //given
+ final String topic = "newTopic";
+ final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+ String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ TIMEOUT_ERROR_MESSAGE);
+ final String toxic = "latency-toxic";
+ DMAAP_PROXY.toxics()
+ .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber
+ .get(mrSubscribeRequest);
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ //cleanup
+ DMAAP_PROXY.toxics().get(toxic).remove();
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
index ef2cb5ef..1f97001e 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.gson.JsonSyntaxException;
+import io.netty.handler.timeout.ReadTimeoutException;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
@@ -48,7 +49,8 @@ class MessageRouterSubscriberImplTest {
private final RxHttpClient httpClient = mock(RxHttpClient.class);
private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
- private final MessageRouterSubscriber cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
+ private final MessageRouterSubscriber
+ cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
@@ -129,4 +131,29 @@ class MessageRouterSubscriberImplTest {
// then
assertThatExceptionOfType(JsonSyntaxException.class).isThrownBy(() -> cut.get(mrRequest).block());
}
+
+ @Test
+ void getWithProperRequest_shouldReturnTimeoutError() {
+
+ // given
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
+ // when
+ final Mono<MessageRouterSubscribeResponse> responses = cut
+ .get(mrRequest);
+ final MessageRouterSubscribeResponse response = responses.block();
+
+ // then
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).contains("408 Request Timeout");
+ assertThat(response.hasElements()).isFalse();
+
+
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+ assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+ mrRequest.consumerGroup(), mrRequest.consumerId()));
+ assertThat(httpRequest.body()).isNull();
+ }
} \ No newline at end of file