aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java12
1 files changed, 9 insertions, 3 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 7d1b0a93..5f72808e 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
import java.time.Duration;
import java.util.stream.Collectors;
@@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
.map(httpResponse -> buildResponse(httpResponse, batch))
- .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+ .doOnError(ReadTimeoutException.class,
+ e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+ .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+ .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch)));
}
private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
@@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
: builder.failReason(extractFailReason(httpResponse)).build();
}
- private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterPublishResponse.builder()
.failReason(failReason)