From 91d17acfab525c96aee50ad14191c78f7e833376 Mon Sep 17 00:00:00 2001 From: tkogut Date: Tue, 29 Dec 2020 15:13:22 +0100 Subject: Add timeout for Publisher(dmaap-client) Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae --- rest-services/http-client/pom.xml | 2 +- .../rest/services/adapters/http/HttpRequest.java | 6 +- .../rest/services/adapters/http/RxHttpClient.java | 9 ++- .../services/adapters/http/RxHttpClientIT.java | 65 ++++++++++++++++------ 4 files changed, 60 insertions(+), 22 deletions(-) (limited to 'rest-services/http-client') diff --git a/rest-services/http-client/pom.xml b/rest-services/http-client/pom.xml index bae2f662..d9a11065 100644 --- a/rest-services/http-client/pom.xml +++ b/rest-services/http-client/pom.xml @@ -28,7 +28,7 @@ org.onap.dcaegen2.services.sdk dcaegen2-services-sdk-rest-services - 1.5.0-SNAPSHOT + 1.6.0-SNAPSHOT org.onap.dcaegen2.services.sdk.rest.services diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java index 33060c9f..9d2d1ee5 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,8 @@ import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import java.time.Duration; + /** * @author Piotr Jaszczyk * @since March 2019 @@ -37,6 +39,8 @@ public interface HttpRequest { HttpMethod method(); + @Nullable Duration timeout(); + @Nullable RequestBody body(); @Value.Default diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index 7ac02bf6..77b842d7 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.vavr.collection.Stream; +import io.vavr.control.Option; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +53,15 @@ public class RxHttpClient { } ResponseReceiver prepareRequest(HttpRequest request) { - final HttpClient theClient = httpClient + final HttpClient simpleClient = httpClient .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req)) .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp)) .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))); + final HttpClient theClient = Option.of(request.timeout()) + .map(simpleClient::responseTimeout) + .getOrElse(simpleClient); + return prepareBody(request, theClient); } diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java index cdddaeff..6f3a0909 100644 --- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; -import static org.assertj.core.api.Assertions.assertThat; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; - import io.netty.handler.codec.http.HttpResponseStatus; -import java.net.MalformedURLException; -import java.net.URL; -import java.time.Duration; +import io.netty.handler.timeout.ReadTimeoutException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -35,6 +30,13 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttp import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; + class RxHttpClientIT { private static final Duration TIMEOUT = Duration.ofHours(5); @@ -43,12 +45,13 @@ class RxHttpClientIT { @BeforeAll static void setUpClass() { - httpServer = DummyHttpServer.start(routes -> - routes.get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) - .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) - .post("/headers-post", (req, resp) -> resp - .sendString(Mono.just(req.requestHeaders().toString()))) - .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) + httpServer = DummyHttpServer.start(routes -> routes + .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) + .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1)))) + .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) + .post("/headers-post", (req, resp) -> resp + .sendString(Mono.just(req.requestHeaders().toString()))) + .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) ); } @@ -65,7 +68,9 @@ class RxHttpClientIT { @Test void simpleGet() throws Exception { // given - final HttpRequest httpRequest = requestFor("/sample-get").method(HttpMethod.GET).build(); + final HttpRequest httpRequest = requestFor("/sample-get") + .method(HttpMethod.GET) + .build(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -73,13 +78,18 @@ class RxHttpClientIT { .map(HttpResponse::bodyAsString); // then - StepVerifier.create(bodyAsString).expectNext("OK").expectComplete().verify(TIMEOUT); + StepVerifier.create(bodyAsString) + .expectNext("OK") + .expectComplete() + .verify(TIMEOUT); } @Test void getWithError() throws Exception { // given - final HttpRequest httpRequest = requestFor("/sample-get-500").method(HttpMethod.GET).build(); + final HttpRequest httpRequest = requestFor("/sample-get-500") + .method(HttpMethod.GET) + .build(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -87,7 +97,9 @@ class RxHttpClientIT { .map(HttpResponse::bodyAsString); // then - StepVerifier.create(bodyAsString).expectError(HttpException.class).verify(TIMEOUT); + StepVerifier.create(bodyAsString) + .expectError(HttpException.class) + .verify(TIMEOUT); } @Test @@ -158,4 +170,21 @@ class RxHttpClientIT { .expectComplete() .verify(TIMEOUT); } -} \ No newline at end of file + + @Test + void getWithTimeoutError() throws Exception { + // given + final HttpRequest httpRequest = requestFor("/delayed-get") + .method(HttpMethod.GET) + .timeout(Duration.ofSeconds(1)) + .build(); + + // when + final Mono response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(ReadTimeoutException.class) + .verify(TIMEOUT); + } +} -- cgit 1.2.3-korg