aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java29
1 files changed, 24 insertions, 5 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index 72c0bad3..f7ccf4f2 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,8 +26,11 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
+
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -35,6 +38,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
@@ -59,16 +65,22 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
@Override
public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
- return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
+ return httpClient.call(buildGetHttpRequest(request))
+ .map(this::buildGetResponse)
+ .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
}
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
- return ImmutableHttpRequest.builder()
+ ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
.url(buildSubscribeUrl(request))
- .diagnosticContext(request.diagnosticContext().withNewInvocationId())
- .build();
+ .diagnosticContext(request.diagnosticContext().withNewInvocationId());
+
+ return Option.of(request.timeoutConfig())
+ .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+ .getOrElse(requestBuilder::build);
}
private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
@@ -90,4 +102,11 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
request.consumerId());
}
+
+ private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason(failReason)
+ .build());
+ }
}