summaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2020-12-29 15:13:22 +0100
committertkogut <tomasz.kogut@nokia.com>2020-12-30 16:48:26 +0100
commit91d17acfab525c96aee50ad14191c78f7e833376 (patch)
tree6f9a417ed318e45ff7b78a3b3b12f6ab62a5734d /rest-services/dmaap-client/src/main
parent8eaf72890a94eceddbbbdcf5015afffaa98176a7 (diff)
Add timeout for Publisher(dmaap-client)
Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae
Diffstat (limited to 'rest-services/dmaap-client/src/main')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java30
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java37
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java50
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java35
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java30
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java38
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java42
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java6
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java34
9 files changed, 289 insertions, 13 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
new file mode 100644
index 00000000..57187c80
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ClientError {
+ RequestError requestError();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java
new file mode 100644
index 00000000..9754719e
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+public interface ClientErrorReason {
+ String header();
+
+ String messageId();
+
+ String text();
+
+ @Nullable List<String> variables();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
new file mode 100644
index 00000000..6b22b378
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.vavr.control.Option;
+
+public class ClientErrorReasonPresenter {
+
+ private ClientErrorReasonPresenter() { }
+
+ private static final Gson GSON = new GsonBuilder().create();
+ private static final String PATTERN = "%s\n%s";
+
+ public static String present(ClientErrorReason clientErrorReason) {
+ ImmutableServiceException simpleServiceException = ImmutableServiceException.builder()
+ .messageId(clientErrorReason.messageId())
+ .text(clientErrorReason.text())
+ .build();
+ ImmutableServiceException serviceException = Option.of(clientErrorReason.variables())
+ .map(simpleServiceException::withVariables)
+ .getOrElse(simpleServiceException);
+ ImmutableRequestError requestError = ImmutableRequestError.builder()
+ .serviceException(serviceException)
+ .build();
+ ClientError clientError = ImmutableClientError.builder()
+ .requestError(requestError)
+ .build();
+ return String.format(PATTERN, clientErrorReason.header(), GSON.toJson(clientError));
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
new file mode 100644
index 00000000..5a51e5f2
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import java.util.Collections;
+
+public class ClientErrorReasons {
+
+ private ClientErrorReasons() { }
+
+ public static final ClientErrorReason TIMEOUT = ImmutableClientErrorReason.builder()
+ .header("408 Request Timeout")
+ .text("Client timeout exception occurred, Error code is %1")
+ .messageId("SVC0001")
+ .variables(Collections.singletonList("408")).build();
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
new file mode 100644
index 00000000..71e673fe
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface RequestError {
+ ServiceException serviceException();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
new file mode 100644
index 00000000..e99330ac
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ServiceException {
+
+ String messageId();
+
+ String text();
+
+ @Nullable List<String> variables();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 191ec64f..16068da0 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,12 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
-
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
+import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
-import java.time.Duration;
-import java.util.stream.Collectors;
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -38,6 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
@@ -47,6 +48,11 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.Duration;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
@@ -77,29 +83,34 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
- .map(httpResponse -> buildResponse(httpResponse, batch));
+ .map(httpResponse -> buildResponse(httpResponse, batch))
+ .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
}
private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
- if(contentType == ContentType.APPLICATION_JSON) {
+ if (contentType == ContentType.APPLICATION_JSON) {
final JsonArray elements = new JsonArray(subItems.size());
subItems.forEach(elements::add);
return RequestBody.fromJson(elements);
- }else if(contentType == ContentType.TEXT_PLAIN){
+ } else if (contentType == ContentType.TEXT_PLAIN) {
String messages = subItems.map(JsonElement::toString)
.collect(Collectors.joining("\n"));
return RequestBody.fromString(messages);
- }else throw new IllegalArgumentException("Unsupported content type: " + contentType);
+ } else throw new IllegalArgumentException("Unsupported content type: " + contentType);
}
private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
- return ImmutableHttpRequest.builder()
+ ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
.diagnosticContext(request.diagnosticContext().withNewInvocationId())
.customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
- .body(body)
- .build();
+ .body(body);
+
+ return Option.of(request.timeoutConfig())
+ .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+ .getOrElse(requestBuilder::build);
}
private MessageRouterPublishResponse buildResponse(
@@ -111,4 +122,11 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
? builder.items(batch).build()
: builder.failReason(extractFailReason(httpResponse)).build();
}
+
+ private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ return Mono.just(ImmutableMessageRouterPublishResponse.builder()
+ .failReason(failReason)
+ .build());
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
index 314756d8..4490c79f 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +21,10 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -33,6 +35,8 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
MessageRouterSink sinkDefinition();
+ @Nullable TimeoutConfig timeoutConfig();
+
@Value.Default
default ContentType contentType() {
return ContentType.APPLICATION_JSON;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
new file mode 100644
index 00000000..413bf8e5
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.immutables.value.Value;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface TimeoutConfig {
+
+ @Value.Default
+ default Duration getTimeout() {
+ return Duration.ofSeconds(4);
+ }
+}