diff options
author | Tony Hansen <tony@att.com> | 2020-07-14 22:16:19 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-07-14 22:16:19 +0000 |
commit | 48a172a227f8674dc07c96303f5cc9b25beb31f3 (patch) | |
tree | ec5b3f6f40e421c506bda26cc3aee8eb18282610 /src/main/java/org | |
parent | 5111509c812150f036af629e4114801c02a6e8fb (diff) | |
parent | f2289c9d85db6659c2f3b9c46ecad6885959f4a8 (diff) |
Merge "Dynamic partition-key support"1.7.1
Diffstat (limited to 'src/main/java/org')
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index b00b2744..3fc9e254 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,6 +42,9 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; class DMaaPEventPublisher implements EventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String EVENT = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; private final Logger outputLogger; @@ -73,7 +76,9 @@ class DMaaPEventPublisher implements EventPublisher { private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) throws IOException { - int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + + String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); + int pendingMsgs = publisher.send(pk, event.toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); } |