diff options
author | tkogut <tomasz.kogut@nokia.com> | 2020-12-29 15:13:22 +0100 |
---|---|---|
committer | tkogut <tomasz.kogut@nokia.com> | 2020-12-30 16:48:26 +0100 |
commit | 91d17acfab525c96aee50ad14191c78f7e833376 (patch) | |
tree | 6f9a417ed318e45ff7b78a3b3b12f6ab62a5734d /rest-services/dmaap-client/src/main | |
parent | 8eaf72890a94eceddbbbdcf5015afffaa98176a7 (diff) |
Add timeout for Publisher(dmaap-client)
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae
Diffstat (limited to 'rest-services/dmaap-client/src/main')
9 files changed, 289 insertions, 13 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java new file mode 100644 index 00000000..57187c80 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface ClientError { + RequestError requestError(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java new file mode 100644 index 00000000..9754719e --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.value.Value; +import reactor.util.annotation.Nullable; + +import java.util.List; + +@Value.Immutable +public interface ClientErrorReason { + String header(); + + String messageId(); + + String text(); + + @Nullable List<String> variables(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java new file mode 100644 index 00000000..6b22b378 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.vavr.control.Option; + +public class ClientErrorReasonPresenter { + + private ClientErrorReasonPresenter() { } + + private static final Gson GSON = new GsonBuilder().create(); + private static final String PATTERN = "%s\n%s"; + + public static String present(ClientErrorReason clientErrorReason) { + ImmutableServiceException simpleServiceException = ImmutableServiceException.builder() + .messageId(clientErrorReason.messageId()) + .text(clientErrorReason.text()) + .build(); + ImmutableServiceException serviceException = Option.of(clientErrorReason.variables()) + .map(simpleServiceException::withVariables) + .getOrElse(simpleServiceException); + ImmutableRequestError requestError = ImmutableRequestError.builder() + .serviceException(serviceException) + .build(); + ClientError clientError = ImmutableClientError.builder() + .requestError(requestError) + .build(); + return String.format(PATTERN, clientErrorReason.header(), GSON.toJson(clientError)); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java new file mode 100644 index 00000000..5a51e5f2 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import java.util.Collections; + +public class ClientErrorReasons { + + private ClientErrorReasons() { } + + public static final ClientErrorReason TIMEOUT = ImmutableClientErrorReason.builder() + .header("408 Request Timeout") + .text("Client timeout exception occurred, Error code is %1") + .messageId("SVC0001") + .variables(Collections.singletonList("408")).build(); + +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java new file mode 100644 index 00000000..71e673fe --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface RequestError { + ServiceException serviceException(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java new file mode 100644 index 00000000..e99330ac --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import reactor.util.annotation.Nullable; + +import java.util.List; + +@Value.Immutable +@Gson.TypeAdapters +public interface ServiceException { + + String messageId(); + + String text(); + + @Nullable List<String> variables(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 191ec64f..16068da0 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,12 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; -import java.time.Duration; -import java.util.stream.Collectors; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -38,6 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; @@ -47,6 +48,11 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.stream.Collectors; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 @@ -77,29 +83,34 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) - .map(httpResponse -> buildResponse(httpResponse, batch)); + .map(httpResponse -> buildResponse(httpResponse, batch)) + .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); } private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { - if(contentType == ContentType.APPLICATION_JSON) { + if (contentType == ContentType.APPLICATION_JSON) { final JsonArray elements = new JsonArray(subItems.size()); subItems.forEach(elements::add); return RequestBody.fromJson(elements); - }else if(contentType == ContentType.TEXT_PLAIN){ + } else if (contentType == ContentType.TEXT_PLAIN) { String messages = subItems.map(JsonElement::toString) .collect(Collectors.joining("\n")); return RequestBody.fromString(messages); - }else throw new IllegalArgumentException("Unsupported content type: " + contentType); + } else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { - return ImmutableHttpRequest.builder() + ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) - .body(body) - .build(); + .body(body); + + return Option.of(request.timeoutConfig()) + .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build()) + .getOrElse(requestBuilder::build); } private MessageRouterPublishResponse buildResponse( @@ -111,4 +122,11 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); } + + private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + return Mono.just(ImmutableMessageRouterPublishResponse.builder() + .failReason(failReason) + .build()); + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 314756d8..4490c79f 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2020 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -33,6 +35,8 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); + @Nullable TimeoutConfig timeoutConfig(); + @Value.Default default ContentType contentType() { return ContentType.APPLICATION_JSON; diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java new file mode 100644 index 00000000..413bf8e5 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import org.immutables.value.Value; + +import java.time.Duration; + +@Value.Immutable +public interface TimeoutConfig { + + @Value.Default + default Duration getTimeout() { + return Duration.ofSeconds(4); + } +} |