diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-06-14 15:26:29 +0200 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-06-26 09:23:52 +0200 |
commit | 97d60b4653c265530485209e8f61f73137d521b0 (patch) | |
tree | 21bbbfe4d077df3062d0f9b8176b94a0941cea4f /rest-services/dmaap-client/src/main/java | |
parent | 3ec6e99c20075386b639f33f27bcd3a3b3e5706d (diff) |
Add text/plain content type handling in Publisher
Change-Id: I51e17d64f813e16b81385abb8aa862ee1f927d35
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1630
Diffstat (limited to 'rest-services/dmaap-client/src/main/java')
3 files changed, 58 insertions, 8 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java new file mode 100644 index 00000000..80a28d6c --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client; + +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.util.AsciiString; + +public enum ContentType { + APPLICATION_JSON(HttpHeaderValues.APPLICATION_JSON), + TEXT_PLAIN(HttpHeaderValues.TEXT_PLAIN); + + private AsciiString contentType; + + ContentType(AsciiString contentType) { + this.contentType = contentType; + } + + @Override + public String toString(){ + return contentType.toString(); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index aa88b9ee..191ec64f 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -27,6 +27,7 @@ import com.google.gson.JsonElement; import io.vavr.collection.HashMap; import io.vavr.collection.List; import java.time.Duration; +import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -35,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; @@ -74,14 +76,20 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { List<JsonElement> batch) { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); - return httpClient.call(buildHttpRequest(request, asJsonBody(batch))) + return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)); } - private @NotNull RequestBody asJsonBody(List<? extends JsonElement> subItems) { - final JsonArray elements = new JsonArray(subItems.size()); - subItems.forEach(elements::add); - return RequestBody.fromJson(elements); + private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { + if(contentType == ContentType.APPLICATION_JSON) { + final JsonArray elements = new JsonArray(subItems.size()); + subItems.forEach(elements::add); + return RequestBody.fromJson(elements); + }else if(contentType == ContentType.TEXT_PLAIN){ + String messages = subItems.map(JsonElement::toString) + .collect(Collectors.joining("\n")); + return RequestBody.fromString(messages); + }else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { @@ -89,7 +97,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType())) + .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) .body(body) .build(); } @@ -98,6 +106,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { HttpResponse httpResponse, List<JsonElement> batch) { final ImmutableMessageRouterPublishResponse.Builder builder = ImmutableMessageRouterPublishResponse.builder(); + return httpResponse.successful() ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 77f92e77..29904138 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -34,7 +35,7 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); @Value.Default - default String contentType() { - return "application/json"; + default ContentType contentType() { + return ContentType.APPLICATION_JSON; } } |