aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java9
1 files changed, 7 insertions, 2 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index b00b2744..3fc9e254 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,6 +42,9 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
class DMaaPEventPublisher implements EventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final String EVENT = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ private static final String PARTITION_KEY = "sourceName";
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
private final Logger outputLogger;
@@ -73,7 +76,9 @@ class DMaaPEventPublisher implements EventPublisher {
private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
throws IOException {
- int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+
+ String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
+ int pendingMsgs = publisher.send(pk, event.toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
}