diff options
author | tkogut <tomasz.kogut@nokia.com> | 2020-12-29 15:13:22 +0100 |
---|---|---|
committer | tkogut <tomasz.kogut@nokia.com> | 2020-12-30 16:48:26 +0100 |
commit | 91d17acfab525c96aee50ad14191c78f7e833376 (patch) | |
tree | 6f9a417ed318e45ff7b78a3b3b12f6ab62a5734d /rest-services/dmaap-client/src | |
parent | 8eaf72890a94eceddbbbdcf5015afffaa98176a7 (diff) |
Add timeout for Publisher(dmaap-client)
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae
Diffstat (limited to 'rest-services/dmaap-client/src')
16 files changed, 609 insertions, 122 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java new file mode 100644 index 00000000..57187c80 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface ClientError { + RequestError requestError(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java new file mode 100644 index 00000000..9754719e --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.value.Value; +import reactor.util.annotation.Nullable; + +import java.util.List; + +@Value.Immutable +public interface ClientErrorReason { + String header(); + + String messageId(); + + String text(); + + @Nullable List<String> variables(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java new file mode 100644 index 00000000..6b22b378 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.vavr.control.Option; + +public class ClientErrorReasonPresenter { + + private ClientErrorReasonPresenter() { } + + private static final Gson GSON = new GsonBuilder().create(); + private static final String PATTERN = "%s\n%s"; + + public static String present(ClientErrorReason clientErrorReason) { + ImmutableServiceException simpleServiceException = ImmutableServiceException.builder() + .messageId(clientErrorReason.messageId()) + .text(clientErrorReason.text()) + .build(); + ImmutableServiceException serviceException = Option.of(clientErrorReason.variables()) + .map(simpleServiceException::withVariables) + .getOrElse(simpleServiceException); + ImmutableRequestError requestError = ImmutableRequestError.builder() + .serviceException(serviceException) + .build(); + ClientError clientError = ImmutableClientError.builder() + .requestError(requestError) + .build(); + return String.format(PATTERN, clientErrorReason.header(), GSON.toJson(clientError)); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java new file mode 100644 index 00000000..5a51e5f2 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import java.util.Collections; + +public class ClientErrorReasons { + + private ClientErrorReasons() { } + + public static final ClientErrorReason TIMEOUT = ImmutableClientErrorReason.builder() + .header("408 Request Timeout") + .text("Client timeout exception occurred, Error code is %1") + .messageId("SVC0001") + .variables(Collections.singletonList("408")).build(); + +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java new file mode 100644 index 00000000..71e673fe --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface RequestError { + ServiceException serviceException(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java new file mode 100644 index 00000000..e99330ac --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import reactor.util.annotation.Nullable; + +import java.util.List; + +@Value.Immutable +@Gson.TypeAdapters +public interface ServiceException { + + String messageId(); + + String text(); + + @Nullable List<String> variables(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 191ec64f..16068da0 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,12 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; -import java.time.Duration; -import java.util.stream.Collectors; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -38,6 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; @@ -47,6 +48,11 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.stream.Collectors; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 @@ -77,29 +83,34 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) - .map(httpResponse -> buildResponse(httpResponse, batch)); + .map(httpResponse -> buildResponse(httpResponse, batch)) + .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); } private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { - if(contentType == ContentType.APPLICATION_JSON) { + if (contentType == ContentType.APPLICATION_JSON) { final JsonArray elements = new JsonArray(subItems.size()); subItems.forEach(elements::add); return RequestBody.fromJson(elements); - }else if(contentType == ContentType.TEXT_PLAIN){ + } else if (contentType == ContentType.TEXT_PLAIN) { String messages = subItems.map(JsonElement::toString) .collect(Collectors.joining("\n")); return RequestBody.fromString(messages); - }else throw new IllegalArgumentException("Unsupported content type: " + contentType); + } else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { - return ImmutableHttpRequest.builder() + ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) - .body(body) - .build(); + .body(body); + + return Option.of(request.timeoutConfig()) + .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) + .getOrElse(requestBuilder::build); } private MessageRouterPublishResponse buildResponse( @@ -111,4 +122,11 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); } + + private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + return Mono.just(ImmutableMessageRouterPublishResponse.builder() + .failReason(failReason) + .build()); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 314756d8..4490c79f 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -33,6 +35,8 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); + @Nullable TimeoutConfig timeoutConfig(); + @Value.Default default ContentType contentType() { return ContentType.APPLICATION_JSON; diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java new file mode 100644 index 00000000..413bf8e5 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import org.immutables.value.Value; + +import java.time.Duration; + +@Value.Immutable +public interface TimeoutConfig { + + @Value.Default + default Duration getTimeout() { + return Duration.ofSeconds(4); + } +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index 8561e0b0..d94639a5 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -28,7 +28,6 @@ import com.google.gson.JsonPrimitive; import io.vavr.collection.List; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; @@ -39,30 +38,39 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig; import reactor.core.publisher.Flux; +import java.time.Duration; + public final class MessageRouterTestsUtils { - private MessageRouterTestsUtils() {} + private MessageRouterTestsUtils() { + } - public static MessageRouterPublishRequest createPublishRequest(String topicUrl){ + public static MessageRouterPublishRequest createPublishRequest(String topicUrl) { return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON); } - public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){ - MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() - .name("the topic") - .topicUrl(topicUrl) + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, Duration timeout) { + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeout(timeout) + .build()) .build(); + } + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType) { return ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) + .sinkDefinition(createMessageRouterSink(topicUrl)) .contentType(contentType) .build(); } public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, - String consumerGroup, String consumerId) { + String consumerGroup, String consumerId) { ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() .name("the topic") .topicUrl(topicUrl) @@ -76,52 +84,53 @@ public final class MessageRouterTestsUtils { .build(); } - public static List<JsonElement> getAsJsonElements(List<String> messages){ + public static List<JsonElement> getAsJsonElements(List<String> messages) { return messages.map(JsonParser::parseString); } - public static List<JsonObject> getAsJsonObjects(List<String> messages){ + public static List<JsonObject> getAsJsonObjects(List<String> messages) { return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); } - public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){ + public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages) { return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive); } - public static JsonObject getAsJsonObject(String item){ + public static JsonObject getAsJsonObject(String item) { return new Gson().fromJson(item, JsonObject.class); } - public static Flux<JsonElement> plainBatch(List<String> messages){ + public static Flux<JsonElement> plainBatch(List<String> messages) { return Flux.fromIterable(getAsJsonElements(messages)); } - public static Flux<JsonObject> jsonBatch(List<String> messages){ + public static Flux<JsonObject> jsonBatch(List<String> messages) { return Flux.fromIterable(getAsJsonObjects(messages)); } - public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){ + public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) { return ImmutableMessageRouterSubscribeResponse .builder() .failReason(String.format(failReasonFormat, formatArgs)) .build(); } - public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){ + public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items) { return ImmutableMessageRouterSubscribeResponse .builder() .items(items) .build(); } - public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){ + public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) { + String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs); return ImmutableMessageRouterPublishResponse .builder() - .failReason(String.format(failReasonFormat, formatArgs)) + .failReason(failReason) .build(); } - public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){ + public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) { return ImmutableMessageRouterPublishResponse .builder() .items(items) @@ -129,7 +138,7 @@ public final class MessageRouterTestsUtils { } public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest, - MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) { + MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) { final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}", "{\"differentMessage\":\"message2\"}"); final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages); @@ -137,4 +146,11 @@ public final class MessageRouterTestsUtils { publisher.put(publishRequest, jsonMessageBatch).blockLast(); subscriber.get(subscribeRequest).block(); } + + private static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java index a314ccf1..494ca62a 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java @@ -31,6 +31,8 @@ final class DMaapContainer { MR_COMPOSE_RESOURCE_NAME); static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; static final String DMAAP_SERVICE_NAME = "dmaap"; + static final int PROXY_SERVICE_EXPOSED_PORT = 8666; + static final String LOCALHOST = "localhost"; private DMaapContainer() {} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index f62359dd..24cd2c34 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -39,8 +41,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.io.IOException; import java.time.Duration; +import java.util.concurrent.TimeUnit; +import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse; @@ -50,6 +55,10 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; @Testcontainers class MessageRouterPublisherIT { @@ -64,23 +73,38 @@ class MessageRouterPublisherIT { + "Successfully published number of messages :0." + "Expected { to start an object.\",\"status\":400" + "}"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + + "{" + + "\"requestError\":" + + "{" + + "\"serviceException\":" + + "{" + + "\"messageId\":\"SVC0001\"," + + "\"text\":\"Client timeout exception occurred, Error code is %1\"," + + "\"variables\":[\"408\"]" + + "}" + + "}" + + "}"; + private static Proxy DMAAP_PROXY; private static String EVENTS_PATH; + private static String PROXY_EVENTS_PATH; private final MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber subscriber = DmaapClientFactory + private final MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); @BeforeAll - static void setUp() { - EVENTS_PATH = String.format("http://%s:%d/events", - CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME, - DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT), - CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME, - DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT)); + static void setUp() throws IOException { + EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); + PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); + + DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", + String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), + String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); } @Test - void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){ + void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given final String topic = "TOPIC"; final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", @@ -100,7 +124,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldHandleBadRequestError(){ + void publisher_shouldHandleBadRequestError() { //given final String topic = "TOPIC2"; final List<String> threePlainTextMessages = List.of("I", "like", "pizza"); @@ -120,7 +144,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishSingleMessage(){ + void publisher_shouldSuccessfullyPublishSingleMessage() { //given final String topic = "TOPIC3"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -145,7 +169,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishMultipleMessages(){ + void publisher_shouldSuccessfullyPublishMultipleMessages() { final String topic = "TOPIC5"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}", @@ -170,7 +194,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){ + void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() { //given final String topic = "TOPIC6"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -197,7 +221,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){ + void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() { //given final String topic = "TOPIC7"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -224,7 +248,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){ + void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() { //given final String topic = "TOPIC8"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -251,7 +275,7 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){ + void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() { //given final String topic = "TOPIC9"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -276,4 +300,30 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } + + @Test + void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException { + //given + final String toxic = "latency-toxic"; + DMAAP_PROXY.toxics() + .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); + final String topic = "TOPIC10"; + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage); + final MessageRouterPublishRequest mrRequest = createPublishRequest( + String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1)); + final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + + //cleanup + DMAAP_PROXY.toxics().get(toxic).remove(); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 1268a16a..b0a07eda 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; - import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; - -import java.time.Duration; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; @@ -44,6 +38,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.time.Duration; + +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 @@ -69,18 +68,19 @@ class MessageRouterPublisherTest { @BeforeAll static void setUp() { - server = DummyHttpServer.start(routes -> - routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) - .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> - sendError(resp, 400, ERROR_MESSAGE)) - .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> - sendError(resp, 401, ERROR_MESSAGE)) - .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> - sendError(resp, 403, ERROR_MESSAGE)) - .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> - sendError(resp, 404, ERROR_MESSAGE)) - .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) -> - sendError(resp, 500, ERROR_MESSAGE)) + server = DummyHttpServer.start(routes -> routes + .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> + sendString(resp, Mono.just("OK"))) + .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> + sendError(resp, 400, ERROR_MESSAGE)) + .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> + sendError(resp, 401, ERROR_MESSAGE)) + .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> + sendError(resp, 403, ERROR_MESSAGE)) + .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> + sendError(resp, 404, ERROR_MESSAGE)) + .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) -> + sendError(resp, 500, ERROR_MESSAGE)) ); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java new file mode 100644 index 00000000..9b318d73 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ClientErrorReasonPresenterTest { + + @Test + void shouldSuccessfullyPresent() { + //given + ClientErrorReason clientErrorReason = createSimple(); + String expected = "header\n" + + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}"; + + //when + String actual = ClientErrorReasonPresenter.present(clientErrorReason); + + //then + assertThat(actual).isEqualTo(expected); + } + + @Test + void shouldSuccessfullyPresentWithVariables() { + //given + ClientErrorReason clientErrorReason = createSimple().withVariables("v1", "v2"); + String expected = "header\n" + + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}"; + + //when + String actual = ClientErrorReasonPresenter.present(clientErrorReason); + + //then + assertThat(actual).isEqualTo(expected); + } + + private ImmutableClientErrorReason createSimple() { + return ImmutableClientErrorReason.builder() + .header("header") + .messageId("messageId") + .text("text") + .build(); + } +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 38659acd..f29bfa27 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,24 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; - +import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.Gson; import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.List; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; @@ -54,6 +45,22 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 @@ -62,6 +69,7 @@ class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; private static final int MAX_BATCH_SIZE = 3; + public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout"; private final RxHttpClient httpClient = mock(RxHttpClient.class); private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1)); private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); @@ -87,11 +95,11 @@ class MessageRouterPublisherImplTest { assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST); assertThat(httpRequest.url()).isEqualTo(TOPIC_URL); assertThat(httpRequest.body()).isNotNull(); - assertThat(httpRequest.body().length()).isGreaterThan(0); + assertThat(httpRequest.body().length()).isPositive(); } @Test - void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() { // given final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); @@ -115,9 +123,8 @@ class MessageRouterPublisherImplTest { } - @Test - void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() { // given final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); @@ -143,7 +150,7 @@ class MessageRouterPublisherImplTest { } @Test - void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() { // given final List<String> threePlainMessages = List.of("I", "like", "cookies"); final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); @@ -168,7 +175,7 @@ class MessageRouterPublisherImplTest { } @Test - void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){ + void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() { // given final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); @@ -267,7 +274,7 @@ class MessageRouterPublisherImplTest { final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1)); assertThat(secondRequest.size()).describedAs("Http request second batch size") - .isEqualTo(MAX_BATCH_SIZE-1); + .isEqualTo(MAX_BATCH_SIZE - 1); assertListsContainSameElements(secondRequest, parsedTwoMessages); } @@ -303,7 +310,7 @@ class MessageRouterPublisherImplTest { final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) .map(JsonElement::getAsJsonObject); assertThat(secondRequest.size()).describedAs("Http request second batch size") - .isEqualTo(MAX_BATCH_SIZE-1); + .isEqualTo(MAX_BATCH_SIZE - 1); assertListsContainSameElements(secondRequest, parsedTwoMessages); } @@ -339,7 +346,7 @@ class MessageRouterPublisherImplTest { final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) .map(JsonElement::getAsJsonPrimitive); assertThat(secondRequest.size()).describedAs("Http request second batch size") - .isEqualTo(MAX_BATCH_SIZE-1); + .isEqualTo(MAX_BATCH_SIZE - 1); assertListsContainSameElements(secondRequest, parsedTwoPlainMessages); } @@ -404,12 +411,79 @@ class MessageRouterPublisherImplTest { verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); } - private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){ + @Test + void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertTimeoutError) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() { + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(successHttpResponse)) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response)) + .consumeNextWith(this::assertTimeoutError) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() { + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)) + .willReturn(Mono.just(successHttpResponse)); + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response)) + .expectComplete() + .verify(TIMEOUT); + } + + private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) { return plainTextMessages .map(message -> String.format("{\"message\":\"%s\"}", message)); } - private static HttpResponse createHttpResponse(String statusReason, int statusCode){ + private static HttpResponse createHttpResponse(String statusReason, int statusCode) { return ImmutableHttpResponse.builder() .statusCode(statusCode) .url(TOPIC_URL) @@ -418,7 +492,7 @@ class MessageRouterPublisherImplTest { .build(); } - private String collectNonEmptyRequestBody(HttpRequest httpRequest){ + private String collectNonEmptyRequestBody(HttpRequest httpRequest) { final String body = Flux.from(httpRequest.body().contents()) .collect(ByteBufAllocator.DEFAULT::compositeBuffer, (byteBufs, buffer) -> byteBufs.addComponent(true, buffer)) @@ -429,11 +503,11 @@ class MessageRouterPublisherImplTest { return body; } - private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){ + private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) { return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class); } - private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){ + private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) { return getAsJsonElements( List.of( collectNonEmptyRequestBody(httpRequest) @@ -442,8 +516,8 @@ class MessageRouterPublisherImplTest { ); } - private void assertListsContainSameElements(List<? extends JsonElement> actualMessages, - List<? extends JsonElement> expectedMessages){ + private void assertListsContainSameElements(List<? extends JsonElement> actualMessages, + List<? extends JsonElement> expectedMessages) { for (int i = 0; i < actualMessages.size(); i++) { assertThat(actualMessages.get(i)) .describedAs(String.format("Http request element at position %d", i)) @@ -452,7 +526,7 @@ class MessageRouterPublisherImplTest { } private void assertListsContainSameElements(JsonArray actualMessages, - List<? extends JsonElement> expectedMessages){ + List<? extends JsonElement> expectedMessages) { assertThat(actualMessages.size()).describedAs("Http request batch size") .isEqualTo(expectedMessages.size()); @@ -463,38 +537,32 @@ class MessageRouterPublisherImplTest { } } + private void assertTimeoutError(MessageRouterPublishResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.items()).isEmpty(); + assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER); + } + private void verifySingleResponse(List<? extends JsonElement> threeMessages, - Flux<MessageRouterPublishResponse> responses) { + Flux<MessageRouterPublishResponse> responses) { StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - threeMessages.get(0), - threeMessages.get(1), - threeMessages.get(2)); - }) + .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response)) .expectComplete() .verify(TIMEOUT); } private void verifyDoubleResponse(List<? extends JsonElement> threeMessages, - List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) { - + List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) { StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - threeMessages.get(0), - threeMessages.get(1), - threeMessages.get(2)); - }) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - twoMessages.get(0), - twoMessages.get(1)); - }) + .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response)) + .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response)) .expectComplete() .verify(TIMEOUT); } -}
\ No newline at end of file + + private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) { + assertThat(response.successful()).describedAs("successful").isTrue(); + JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new); + assertThat(response.items()).containsExactly(jsonElements); + } +} diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml index 20cade07..ab6641cb 100644 --- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -66,6 +66,17 @@ services: depends_on: - zookeeper - kafka + + toxiproxy: + image: shopify/toxiproxy:2.1.4 + ports: + - "8474:8474" + - "8666:8666" + networks: + - net + depends_on: + - dmaap + networks: net: driver: bridge |