diff options
author | 2019-03-27 08:58:44 +0100 | |
---|---|---|
committer | 2019-04-01 14:59:20 +0200 | |
commit | 07636bbe415099175afcbf55c8f08a24e5c357b5 (patch) | |
tree | e657bc0c6057dee44d3d74baf66eb3e039a0096c /rest-services/common-dependency | |
parent | 6c25f486820d1fca8f8aa7c075dec832ca9d1536 (diff) |
Stub of the DMaaP Client 2.0 implementation
This is untested. Treat it as a proof of concept.
Change-Id: Ieeef7c9481324984c9772b216d001254dec11ae9
Issue-ID: DCAEGEN2-1368
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/common-dependency')
6 files changed, 154 insertions, 20 deletions
diff --git a/rest-services/common-dependency/pom.xml b/rest-services/common-dependency/pom.xml index 4d9b2e08..0472fac6 100644 --- a/rest-services/common-dependency/pom.xml +++ b/rest-services/common-dependency/pom.xml @@ -62,6 +62,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java new file mode 100644 index 00000000..4ef43a59 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since April 2019 + */ +public final class HttpHeaders { + + private HttpHeaders() { + } + + public static final String CONTENT_TYPE = "Content-Type"; + public static final String CONTENT_LENGTH = "Content-Length"; +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java index 78660833..33060c9f 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java @@ -20,17 +20,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; -import io.netty.buffer.ByteBuf; import io.vavr.collection.HashMap; import io.vavr.collection.Map; -import java.util.function.BiFunction; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; -import reactor.netty.NettyOutbound; -import reactor.netty.http.client.HttpClientRequest; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -43,6 +37,8 @@ public interface HttpRequest { HttpMethod method(); + @Nullable RequestBody body(); + @Value.Default default RequestDiagnosticContext diagnosticContext() { return RequestDiagnosticContext.create(); @@ -53,11 +49,6 @@ public interface HttpRequest { return HashMap.empty(); } - @Value.Default - default Publisher<ByteBuf> body() { - return Mono.empty(); - } - @Value.Derived default Map<String, String> headers() { final RequestDiagnosticContext ctx = diagnosticContext(); diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java index 514ea0bf..ed218887 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java @@ -23,9 +23,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import com.google.gson.JsonElement; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.ByteBufFlux; @@ -33,20 +38,39 @@ import reactor.netty.ByteBufFlux; * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -public final class RequestBody { +@Value.Immutable +public interface RequestBody { - private RequestBody() { + Publisher<ByteBuf> contents(); + + @Nullable Integer length(); + + static RequestBody chunkedFromString(Publisher<String> contents) { + return chunkedFromString(contents, StandardCharsets.UTF_8); } - public static Publisher<ByteBuf> fromString(String contents) { + static RequestBody chunkedFromString(Publisher<String> contents, Charset charset) { + return ImmutableRequestBody.builder() + .length(null) + .contents(ByteBufFlux.fromString(contents, charset, ByteBufAllocator.DEFAULT)) + .build(); + } + + static RequestBody fromString(String contents) { return fromString(contents, StandardCharsets.UTF_8); } - public static Publisher<ByteBuf> fromString(String contents, Charset charset) { - return ByteBufFlux.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT); + static RequestBody fromString(String contents, Charset charset) { + ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer(); + encodedContents.writeCharSequence(contents, charset); + + return ImmutableRequestBody.builder() + .length(encodedContents.readableBytes()) + .contents(Mono.just(encodedContents.retain())) + .build(); } - public static Publisher<ByteBuf> fromJson(JsonElement contents) { + static RequestBody fromJson(JsonElement contents) { return fromString(contents.toString()); } diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index f384c1c1..709f5e59 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -61,14 +61,44 @@ public class RxHttpClient { } ResponseReceiver<?> prepareRequest(HttpRequest request) { - return httpClient + final HttpClient theClient = httpClient .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req)) .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp)) - .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))) + .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))); + + return prepareBody(request, theClient); + } + + private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { + if (request.body() == null) { + return prepareBodyWithoutContents(request, theClient); + } else { + return request.body().length() == null + ? prepareBodyChunked(request, theClient) + : prepareBodyUnchunked(request, theClient); + } + } + + private ResponseReceiver<?> prepareBodyChunked(HttpRequest request, HttpClient theClient) { + return theClient + .chunkedTransfer(true) + .request(request.method().asNetty()) + .send(request.body().contents()) + .uri(request.url()); + } + + private ResponseReceiver<?> prepareBodyUnchunked(HttpRequest request, HttpClient theClient) { + return theClient + .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(request.body()) + .send(request.body().contents()) .uri(request.url()); + } + private ResponseReceiver<?> prepareBodyWithoutContents(HttpRequest request, HttpClient theClient) { + return theClient + .request(request.method().asNetty()) + .uri(request.url()); } private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) { diff --git a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java index 5ae62c87..8c57a693 100644 --- a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java +++ b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; import io.netty.handler.codec.http.HttpResponseStatus; @@ -45,6 +46,8 @@ class RxHttpClientIT { httpServer = DummyHttpServer.start(routes -> routes.get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) + .post("/headers-post", (req, resp) -> resp + .sendString(Mono.just(req.requestHeaders().toString()))) .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) ); } @@ -107,4 +110,52 @@ class RxHttpClientIT { .expectComplete() .verify(TIMEOUT); } + + @Test + void testChunkedEncoding() throws Exception { + // given + final String requestBody = "hello world"; + final HttpRequest httpRequest = requestFor("/headers-post") + .method(HttpMethod.POST) + .body(RequestBody.chunkedFromString(Mono.just(requestBody))) + .build(); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString.map(String::toLowerCase)) + .consumeNextWith(responseBody -> { + assertThat(responseBody).contains("transfer-encoding: chunked"); + assertThat(responseBody).doesNotContain("content-length"); + }) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void testUnchunkedEncoding() throws Exception { + // given + final String requestBody = "hello world"; + final HttpRequest httpRequest = requestFor("/headers-post") + .method(HttpMethod.POST) + .body(RequestBody.fromString(requestBody)) + .build(); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString.map(String::toLowerCase)) + .consumeNextWith(responseBody -> { + assertThat(responseBody).doesNotContain("transfer-encoding"); + assertThat(responseBody).contains("content-length"); + }) + .expectComplete() + .verify(TIMEOUT); + } }
\ No newline at end of file |