diff options
author | 2019-03-27 08:58:44 +0100 | |
---|---|---|
committer | 2019-04-01 14:59:20 +0200 | |
commit | 07636bbe415099175afcbf55c8f08a24e5c357b5 (patch) | |
tree | e657bc0c6057dee44d3d74baf66eb3e039a0096c /rest-services/common-dependency/src/main | |
parent | 6c25f486820d1fca8f8aa7c075dec832ca9d1536 (diff) |
Stub of the DMaaP Client 2.0 implementation
This is untested. Treat it as a proof of concept.
Change-Id: Ieeef7c9481324984c9772b216d001254dec11ae9
Issue-ID: DCAEGEN2-1368
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/common-dependency/src/main')
4 files changed, 99 insertions, 20 deletions
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java new file mode 100644 index 00000000..4ef43a59 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since April 2019 + */ +public final class HttpHeaders { + + private HttpHeaders() { + } + + public static final String CONTENT_TYPE = "Content-Type"; + public static final String CONTENT_LENGTH = "Content-Length"; +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java index 78660833..33060c9f 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java @@ -20,17 +20,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; -import io.netty.buffer.ByteBuf; import io.vavr.collection.HashMap; import io.vavr.collection.Map; -import java.util.function.BiFunction; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; -import reactor.netty.NettyOutbound; -import reactor.netty.http.client.HttpClientRequest; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -43,6 +37,8 @@ public interface HttpRequest { HttpMethod method(); + @Nullable RequestBody body(); + @Value.Default default RequestDiagnosticContext diagnosticContext() { return RequestDiagnosticContext.create(); @@ -53,11 +49,6 @@ public interface HttpRequest { return HashMap.empty(); } - @Value.Default - default Publisher<ByteBuf> body() { - return Mono.empty(); - } - @Value.Derived default Map<String, String> headers() { final RequestDiagnosticContext ctx = diagnosticContext(); diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java index 514ea0bf..ed218887 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java @@ -23,9 +23,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import com.google.gson.JsonElement; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.ByteBufFlux; @@ -33,20 +38,39 @@ import reactor.netty.ByteBufFlux; * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -public final class RequestBody { +@Value.Immutable +public interface RequestBody { - private RequestBody() { + Publisher<ByteBuf> contents(); + + @Nullable Integer length(); + + static RequestBody chunkedFromString(Publisher<String> contents) { + return chunkedFromString(contents, StandardCharsets.UTF_8); } - public static Publisher<ByteBuf> fromString(String contents) { + static RequestBody chunkedFromString(Publisher<String> contents, Charset charset) { + return ImmutableRequestBody.builder() + .length(null) + .contents(ByteBufFlux.fromString(contents, charset, ByteBufAllocator.DEFAULT)) + .build(); + } + + static RequestBody fromString(String contents) { return fromString(contents, StandardCharsets.UTF_8); } - public static Publisher<ByteBuf> fromString(String contents, Charset charset) { - return ByteBufFlux.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT); + static RequestBody fromString(String contents, Charset charset) { + ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer(); + encodedContents.writeCharSequence(contents, charset); + + return ImmutableRequestBody.builder() + .length(encodedContents.readableBytes()) + .contents(Mono.just(encodedContents.retain())) + .build(); } - public static Publisher<ByteBuf> fromJson(JsonElement contents) { + static RequestBody fromJson(JsonElement contents) { return fromString(contents.toString()); } diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index f384c1c1..709f5e59 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -61,14 +61,44 @@ public class RxHttpClient { } ResponseReceiver<?> prepareRequest(HttpRequest request) { - return httpClient + final HttpClient theClient = httpClient .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req)) .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp)) - .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))) + .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))); + + return prepareBody(request, theClient); + } + + private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { + if (request.body() == null) { + return prepareBodyWithoutContents(request, theClient); + } else { + return request.body().length() == null + ? prepareBodyChunked(request, theClient) + : prepareBodyUnchunked(request, theClient); + } + } + + private ResponseReceiver<?> prepareBodyChunked(HttpRequest request, HttpClient theClient) { + return theClient + .chunkedTransfer(true) + .request(request.method().asNetty()) + .send(request.body().contents()) + .uri(request.url()); + } + + private ResponseReceiver<?> prepareBodyUnchunked(HttpRequest request, HttpClient theClient) { + return theClient + .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(request.body()) + .send(request.body().contents()) .uri(request.url()); + } + private ResponseReceiver<?> prepareBodyWithoutContents(HttpRequest request, HttpClient theClient) { + return theClient + .request(request.method().asNetty()) + .uri(request.url()); } private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) { |