aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java8
1 files changed, 6 insertions, 2 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index f09c5397..aa88b9ee 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -40,6 +40,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.Immutable
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -47,11 +49,11 @@ import reactor.core.publisher.Mono;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
*/
-// TODO: This is a PoC. It's untested.
public class MessageRouterPublisherImpl implements MessageRouterPublisher {
private final RxHttpClient httpClient;
private final int maxBatchSize;
private final Duration maxBatchDuration;
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) {
this.httpClient = httpClient;
@@ -70,6 +72,8 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr(
MessageRouterPublishRequest request,
List<JsonElement> batch) {
+ LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
+ LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, asJsonBody(batch)))
.map(httpResponse -> buildResponse(httpResponse, batch));
}
@@ -84,7 +88,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
return ImmutableHttpRequest.builder()
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
- .diagnosticContext(request.diagnosticContext())
+ .diagnosticContext(request.diagnosticContext().withNewInvocationId())
.customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
.body(body)
.build();