summaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2020-12-29 15:13:22 +0100
committertkogut <tomasz.kogut@nokia.com>2020-12-30 16:48:26 +0100
commit91d17acfab525c96aee50ad14191c78f7e833376 (patch)
tree6f9a417ed318e45ff7b78a3b3b12f6ab62a5734d /rest-services/dmaap-client/src
parent8eaf72890a94eceddbbbdcf5015afffaa98176a7 (diff)
Add timeout for Publisher(dmaap-client)
Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae
Diffstat (limited to 'rest-services/dmaap-client/src')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java30
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java37
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java50
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java35
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java30
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java38
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java42
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java6
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java34
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java58
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java2
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java82
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java38
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java64
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java174
-rw-r--r--rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml11
16 files changed, 609 insertions, 122 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
new file mode 100644
index 00000000..57187c80
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ClientError {
+ RequestError requestError();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java
new file mode 100644
index 00000000..9754719e
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+public interface ClientErrorReason {
+ String header();
+
+ String messageId();
+
+ String text();
+
+ @Nullable List<String> variables();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
new file mode 100644
index 00000000..6b22b378
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.vavr.control.Option;
+
+public class ClientErrorReasonPresenter {
+
+ private ClientErrorReasonPresenter() { }
+
+ private static final Gson GSON = new GsonBuilder().create();
+ private static final String PATTERN = "%s\n%s";
+
+ public static String present(ClientErrorReason clientErrorReason) {
+ ImmutableServiceException simpleServiceException = ImmutableServiceException.builder()
+ .messageId(clientErrorReason.messageId())
+ .text(clientErrorReason.text())
+ .build();
+ ImmutableServiceException serviceException = Option.of(clientErrorReason.variables())
+ .map(simpleServiceException::withVariables)
+ .getOrElse(simpleServiceException);
+ ImmutableRequestError requestError = ImmutableRequestError.builder()
+ .serviceException(serviceException)
+ .build();
+ ClientError clientError = ImmutableClientError.builder()
+ .requestError(requestError)
+ .build();
+ return String.format(PATTERN, clientErrorReason.header(), GSON.toJson(clientError));
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
new file mode 100644
index 00000000..5a51e5f2
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import java.util.Collections;
+
+public class ClientErrorReasons {
+
+ private ClientErrorReasons() { }
+
+ public static final ClientErrorReason TIMEOUT = ImmutableClientErrorReason.builder()
+ .header("408 Request Timeout")
+ .text("Client timeout exception occurred, Error code is %1")
+ .messageId("SVC0001")
+ .variables(Collections.singletonList("408")).build();
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
new file mode 100644
index 00000000..71e673fe
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface RequestError {
+ ServiceException serviceException();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
new file mode 100644
index 00000000..e99330ac
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ServiceException {
+
+ String messageId();
+
+ String text();
+
+ @Nullable List<String> variables();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 191ec64f..16068da0 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,12 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
-
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
+import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
-import java.time.Duration;
-import java.util.stream.Collectors;
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -38,6 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
@@ -47,6 +48,11 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.Duration;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
@@ -77,29 +83,34 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
- .map(httpResponse -> buildResponse(httpResponse, batch));
+ .map(httpResponse -> buildResponse(httpResponse, batch))
+ .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
}
private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
- if(contentType == ContentType.APPLICATION_JSON) {
+ if (contentType == ContentType.APPLICATION_JSON) {
final JsonArray elements = new JsonArray(subItems.size());
subItems.forEach(elements::add);
return RequestBody.fromJson(elements);
- }else if(contentType == ContentType.TEXT_PLAIN){
+ } else if (contentType == ContentType.TEXT_PLAIN) {
String messages = subItems.map(JsonElement::toString)
.collect(Collectors.joining("\n"));
return RequestBody.fromString(messages);
- }else throw new IllegalArgumentException("Unsupported content type: " + contentType);
+ } else throw new IllegalArgumentException("Unsupported content type: " + contentType);
}
private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
- return ImmutableHttpRequest.builder()
+ ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
.diagnosticContext(request.diagnosticContext().withNewInvocationId())
.customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
- .body(body)
- .build();
+ .body(body);
+
+ return Option.of(request.timeoutConfig())
+ .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+ .getOrElse(requestBuilder::build);
}
private MessageRouterPublishResponse buildResponse(
@@ -111,4 +122,11 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
? builder.items(batch).build()
: builder.failReason(extractFailReason(httpResponse)).build();
}
+
+ private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ return Mono.just(ImmutableMessageRouterPublishResponse.builder()
+ .failReason(failReason)
+ .build());
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
index 314756d8..4490c79f 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +21,10 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -33,6 +35,8 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
MessageRouterSink sinkDefinition();
+ @Nullable TimeoutConfig timeoutConfig();
+
@Value.Default
default ContentType contentType() {
return ContentType.APPLICATION_JSON;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
new file mode 100644
index 00000000..413bf8e5
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.immutables.value.Value;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface TimeoutConfig {
+
+ @Value.Default
+ default Duration getTimeout() {
+ return Duration.ofSeconds(4);
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
index 8561e0b0..d94639a5 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
@@ -28,7 +28,6 @@ import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
@@ -39,30 +38,39 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig;
import reactor.core.publisher.Flux;
+import java.time.Duration;
+
public final class MessageRouterTestsUtils {
- private MessageRouterTestsUtils() {}
+ private MessageRouterTestsUtils() {
+ }
- public static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
}
- public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){
- MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(topicUrl)
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl, Duration timeout) {
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(createMessageRouterSink(topicUrl))
+ .contentType(ContentType.APPLICATION_JSON)
+ .timeoutConfig(ImmutableTimeoutConfig.builder()
+ .timeout(timeout)
+ .build())
.build();
+ }
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType) {
return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
+ .sinkDefinition(createMessageRouterSink(topicUrl))
.contentType(contentType)
.build();
}
public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
- String consumerGroup, String consumerId) {
+ String consumerGroup, String consumerId) {
ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
.name("the topic")
.topicUrl(topicUrl)
@@ -76,52 +84,53 @@ public final class MessageRouterTestsUtils {
.build();
}
- public static List<JsonElement> getAsJsonElements(List<String> messages){
+ public static List<JsonElement> getAsJsonElements(List<String> messages) {
return messages.map(JsonParser::parseString);
}
- public static List<JsonObject> getAsJsonObjects(List<String> messages){
+ public static List<JsonObject> getAsJsonObjects(List<String> messages) {
return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
}
- public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){
+ public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages) {
return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
}
- public static JsonObject getAsJsonObject(String item){
+ public static JsonObject getAsJsonObject(String item) {
return new Gson().fromJson(item, JsonObject.class);
}
- public static Flux<JsonElement> plainBatch(List<String> messages){
+ public static Flux<JsonElement> plainBatch(List<String> messages) {
return Flux.fromIterable(getAsJsonElements(messages));
}
- public static Flux<JsonObject> jsonBatch(List<String> messages){
+ public static Flux<JsonObject> jsonBatch(List<String> messages) {
return Flux.fromIterable(getAsJsonObjects(messages));
}
- public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+ public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) {
return ImmutableMessageRouterSubscribeResponse
.builder()
.failReason(String.format(failReasonFormat, formatArgs))
.build();
}
- public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+ public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items) {
return ImmutableMessageRouterSubscribeResponse
.builder()
.items(items)
.build();
}
- public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+ public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
+ String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
return ImmutableMessageRouterPublishResponse
.builder()
- .failReason(String.format(failReasonFormat, formatArgs))
+ .failReason(failReason)
.build();
}
- public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+ public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
return ImmutableMessageRouterPublishResponse
.builder()
.items(items)
@@ -129,7 +138,7 @@ public final class MessageRouterTestsUtils {
}
public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
- MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+ MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
@@ -137,4 +146,11 @@ public final class MessageRouterTestsUtils {
publisher.put(publishRequest, jsonMessageBatch).blockLast();
subscriber.get(subscribeRequest).block();
}
+
+ private static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+ return ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
index a314ccf1..494ca62a 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
@@ -31,6 +31,8 @@ final class DMaapContainer {
MR_COMPOSE_RESOURCE_NAME);
static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
static final String DMAAP_SERVICE_NAME = "dmaap";
+ static final int PROXY_SERVICE_EXPOSED_PORT = 8666;
+ static final String LOCALHOST = "localhost";
private DMaapContainer() {}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index f62359dd..24cd2c34 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.vavr.collection.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -39,8 +41,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.io.IOException;
import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
@@ -50,6 +55,10 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
@Testcontainers
class MessageRouterPublisherIT {
@@ -64,23 +73,38 @@ class MessageRouterPublisherIT {
+ "Successfully published number of messages :0."
+ "Expected { to start an object.\",\"status\":400"
+ "}";
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ + "{"
+ + "\"requestError\":"
+ + "{"
+ + "\"serviceException\":"
+ + "{"
+ + "\"messageId\":\"SVC0001\","
+ + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+ + "\"variables\":[\"408\"]"
+ + "}"
+ + "}"
+ + "}";
+ private static Proxy DMAAP_PROXY;
private static String EVENTS_PATH;
+ private static String PROXY_EVENTS_PATH;
private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber subscriber = DmaapClientFactory
+ private final MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@BeforeAll
- static void setUp() {
- EVENTS_PATH = String.format("http://%s:%d/events",
- CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
- DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
- CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
- DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
+ static void setUp() throws IOException {
+ EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
+ PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+
+ DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
+ String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
+ String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
- void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
+ void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
final String topic = "TOPIC";
final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
@@ -100,7 +124,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleBadRequestError(){
+ void publisher_shouldHandleBadRequestError() {
//given
final String topic = "TOPIC2";
final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
@@ -120,7 +144,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishSingleMessage(){
+ void publisher_shouldSuccessfullyPublishSingleMessage() {
//given
final String topic = "TOPIC3";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -145,7 +169,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishMultipleMessages(){
+ void publisher_shouldSuccessfullyPublishMultipleMessages() {
final String topic = "TOPIC5";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
@@ -170,7 +194,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
+ void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
//given
final String topic = "TOPIC6";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -197,7 +221,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
+ void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
//given
final String topic = "TOPIC7";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -224,7 +248,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
+ void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
//given
final String topic = "TOPIC8";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -251,7 +275,7 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
+ void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
//given
final String topic = "TOPIC9";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -276,4 +300,30 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify();
}
+
+ @Test
+ void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
+ //given
+ final String toxic = "latency-toxic";
+ DMAAP_PROXY.toxics()
+ .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
+ final String topic = "TOPIC10";
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(
+ String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ //cleanup
+ DMAAP_PROXY.toxics().get(toxic).remove();
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index 1268a16a..b0a07eda 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,15 +20,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-
-import java.time.Duration;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
@@ -44,6 +38,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.time.Duration;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
@@ -69,18 +68,19 @@ class MessageRouterPublisherTest {
@BeforeAll
static void setUp() {
- server = DummyHttpServer.start(routes ->
- routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
- .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
- sendError(resp, 400, ERROR_MESSAGE))
- .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
- sendError(resp, 404, ERROR_MESSAGE))
- .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE))
+ server = DummyHttpServer.start(routes -> routes
+ .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) ->
+ sendString(resp, Mono.just("OK")))
+ .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
+ sendError(resp, 400, ERROR_MESSAGE))
+ .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
+ sendError(resp, 401, ERROR_MESSAGE))
+ .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
+ sendError(resp, 403, ERROR_MESSAGE))
+ .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
+ sendError(resp, 404, ERROR_MESSAGE))
+ .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
+ sendError(resp, 500, ERROR_MESSAGE))
);
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
new file mode 100644
index 00000000..9b318d73
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ClientErrorReasonPresenterTest {
+
+ @Test
+ void shouldSuccessfullyPresent() {
+ //given
+ ClientErrorReason clientErrorReason = createSimple();
+ String expected = "header\n"
+ + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}";
+
+ //when
+ String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+
+ //then
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ void shouldSuccessfullyPresentWithVariables() {
+ //given
+ ClientErrorReason clientErrorReason = createSimple().withVariables("v1", "v2");
+ String expected = "header\n"
+ + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}";
+
+ //when
+ String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+
+ //then
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private ImmutableClientErrorReason createSimple() {
+ return ImmutableClientErrorReason.builder()
+ .header("header")
+ .messageId("messageId")
+ .text("text")
+ .build();
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index 38659acd..f29bfa27 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,24 +20,15 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
-
+import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.Gson;
import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.List;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -54,6 +45,22 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since April 2019
@@ -62,6 +69,7 @@ class MessageRouterPublisherImplTest {
private static final Duration TIMEOUT = Duration.ofSeconds(5);
private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
private static final int MAX_BATCH_SIZE = 3;
+ public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
private final RxHttpClient httpClient = mock(RxHttpClient.class);
private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
@@ -87,11 +95,11 @@ class MessageRouterPublisherImplTest {
assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
assertThat(httpRequest.body()).isNotNull();
- assertThat(httpRequest.body().length()).isGreaterThan(0);
+ assertThat(httpRequest.body().length()).isPositive();
}
@Test
- void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+ void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
// given
final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
@@ -115,9 +123,8 @@ class MessageRouterPublisherImplTest {
}
-
@Test
- void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+ void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
// given
final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
@@ -143,7 +150,7 @@ class MessageRouterPublisherImplTest {
}
@Test
- void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+ void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
// given
final List<String> threePlainMessages = List.of("I", "like", "cookies");
final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
@@ -168,7 +175,7 @@ class MessageRouterPublisherImplTest {
}
@Test
- void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
+ void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
// given
final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
@@ -267,7 +274,7 @@ class MessageRouterPublisherImplTest {
final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
assertThat(secondRequest.size()).describedAs("Http request second batch size")
- .isEqualTo(MAX_BATCH_SIZE-1);
+ .isEqualTo(MAX_BATCH_SIZE - 1);
assertListsContainSameElements(secondRequest, parsedTwoMessages);
}
@@ -303,7 +310,7 @@ class MessageRouterPublisherImplTest {
final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
.map(JsonElement::getAsJsonObject);
assertThat(secondRequest.size()).describedAs("Http request second batch size")
- .isEqualTo(MAX_BATCH_SIZE-1);
+ .isEqualTo(MAX_BATCH_SIZE - 1);
assertListsContainSameElements(secondRequest, parsedTwoMessages);
}
@@ -339,7 +346,7 @@ class MessageRouterPublisherImplTest {
final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
.map(JsonElement::getAsJsonPrimitive);
assertThat(secondRequest.size()).describedAs("Http request second batch size")
- .isEqualTo(MAX_BATCH_SIZE-1);
+ .isEqualTo(MAX_BATCH_SIZE - 1);
assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
}
@@ -404,12 +411,79 @@ class MessageRouterPublisherImplTest {
verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
}
- private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+ @Test
+ void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertTimeoutError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(successHttpResponse))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
+ .consumeNextWith(this::assertTimeoutError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
+ .willReturn(Mono.just(successHttpResponse));
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
return plainTextMessages
.map(message -> String.format("{\"message\":\"%s\"}", message));
}
- private static HttpResponse createHttpResponse(String statusReason, int statusCode){
+ private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
return ImmutableHttpResponse.builder()
.statusCode(statusCode)
.url(TOPIC_URL)
@@ -418,7 +492,7 @@ class MessageRouterPublisherImplTest {
.build();
}
- private String collectNonEmptyRequestBody(HttpRequest httpRequest){
+ private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
final String body = Flux.from(httpRequest.body().contents())
.collect(ByteBufAllocator.DEFAULT::compositeBuffer,
(byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
@@ -429,11 +503,11 @@ class MessageRouterPublisherImplTest {
return body;
}
- private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
+ private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
}
- private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
+ private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
return getAsJsonElements(
List.of(
collectNonEmptyRequestBody(httpRequest)
@@ -442,8 +516,8 @@ class MessageRouterPublisherImplTest {
);
}
- private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
- List<? extends JsonElement> expectedMessages){
+ private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
+ List<? extends JsonElement> expectedMessages) {
for (int i = 0; i < actualMessages.size(); i++) {
assertThat(actualMessages.get(i))
.describedAs(String.format("Http request element at position %d", i))
@@ -452,7 +526,7 @@ class MessageRouterPublisherImplTest {
}
private void assertListsContainSameElements(JsonArray actualMessages,
- List<? extends JsonElement> expectedMessages){
+ List<? extends JsonElement> expectedMessages) {
assertThat(actualMessages.size()).describedAs("Http request batch size")
.isEqualTo(expectedMessages.size());
@@ -463,38 +537,32 @@ class MessageRouterPublisherImplTest {
}
}
+ private void assertTimeoutError(MessageRouterPublishResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.items()).isEmpty();
+ assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
+ }
+
private void verifySingleResponse(List<? extends JsonElement> threeMessages,
- Flux<MessageRouterPublishResponse> responses) {
+ Flux<MessageRouterPublishResponse> responses) {
StepVerifier.create(responses)
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- threeMessages.get(0),
- threeMessages.get(1),
- threeMessages.get(2));
- })
+ .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
.expectComplete()
.verify(TIMEOUT);
}
private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
- List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
-
+ List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
StepVerifier.create(responses)
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- threeMessages.get(0),
- threeMessages.get(1),
- threeMessages.get(2));
- })
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- twoMessages.get(0),
- twoMessages.get(1));
- })
+ .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
+ .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
.expectComplete()
.verify(TIMEOUT);
}
-} \ No newline at end of file
+
+ private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
+ assertThat(response.successful()).describedAs("successful").isTrue();
+ JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
+ assertThat(response.items()).containsExactly(jsonElements);
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
index 20cade07..ab6641cb 100644
--- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
+++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
@@ -66,6 +66,17 @@ services:
depends_on:
- zookeeper
- kafka
+
+ toxiproxy:
+ image: shopify/toxiproxy:2.1.4
+ ports:
+ - "8474:8474"
+ - "8666:8666"
+ networks:
+ - net
+ depends_on:
+ - dmaap
+
networks:
net:
driver: bridge