diff options
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java')
-rw-r--r-- | rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index f09c5397..aa88b9ee 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -40,6 +40,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.Immutable import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -47,11 +49,11 @@ import reactor.core.publisher.Mono; * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -// TODO: This is a PoC. It's untested. public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final RxHttpClient httpClient; private final int maxBatchSize; private final Duration maxBatchDuration; + private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) { this.httpClient = httpClient; @@ -70,6 +72,8 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr( MessageRouterPublishRequest request, List<JsonElement> batch) { + LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); + LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, asJsonBody(batch))) .map(httpResponse -> buildResponse(httpResponse, batch)); } @@ -84,7 +88,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { return ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) - .diagnosticContext(request.diagnosticContext()) + .diagnosticContext(request.diagnosticContext().withNewInvocationId()) .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType())) .body(body) .build(); |