diff options
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java')
-rw-r--r-- | rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 7d1b0a93..5f72808e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; @@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.ConnectException; import java.time.Duration; import java.util.stream.Collectors; @@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)) - .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); + .doOnError(ReadTimeoutException.class, + e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) + .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) + .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch))); } private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { @@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { : builder.failReason(extractFailReason(httpResponse)).build(); } - private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) |