diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2020-07-14 18:34:45 +0000 |
---|---|---|
committer | Vijay Venkatesh Kumar <vv770d@att.com> | 2020-07-14 18:35:19 +0000 |
commit | f2289c9d85db6659c2f3b9c46ecad6885959f4a8 (patch) | |
tree | 6c87b0893b7b235a8951fe96e7f1fc448f960e40 | |
parent | a0d740ab9ad912986f95e68177449e1e3d015be7 (diff) |
Dynamic partition-key support
Change-Id: I6c5faf4a610813e1382da55f62b5a0f9da6db506
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Issue-ID: DCAEGEN2-1484
-rw-r--r-- | Changelog.md | 2 | ||||
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java | 9 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java | 13 | ||||
-rw-r--r-- | version.properties | 2 |
5 files changed, 19 insertions, 9 deletions
diff --git a/Changelog.md b/Changelog.md index a87250e9..02c72610 100644 --- a/Changelog.md +++ b/Changelog.md @@ -16,3 +16,5 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - WebMvcConfig ## [1.7.0] - 09/07/2020 - [DCAEGEN2-2254](https://jira.onap.org/browse/DCAEGEN2-2254) - Update schema to CommonEventFormat_30.2_ONAP in the eventListerner/v7 interface +## [1.7.1] - 13/07/2020 + - [DCAEGEN2-1484](https://jira.onap.org/browse/DCAEGEN2-1484) - VESCollector DMaap publish optimization @@ -24,7 +24,7 @@ </parent>
<groupId>org.onap.dcaegen2.collectors.ves</groupId>
<artifactId>VESCollector</artifactId>
- <version>1.7.0-SNAPSHOT</version>
+ <version>1.7.1-SNAPSHOT</version>
<name>dcaegen2-collectors-ves</name>
<description>VESCollector</description>
<properties>
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index b00b2744..3fc9e254 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,6 +42,9 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; class DMaaPEventPublisher implements EventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String EVENT = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; private final Logger outputLogger; @@ -73,7 +76,9 @@ class DMaaPEventPublisher implements EventPublisher { private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) throws IOException { - int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + + String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); + int pendingMsgs = publisher.send(pk, event.toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); } diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java index 809ac99c..45cdf282 100644 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java +++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java @@ -3,6 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2020 AT&T. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,26 +53,28 @@ public class DMaaPEventPublisherTest { @Test public void shouldSendEventToTopic() throws Exception { // given - JSONObject event = new JSONObject("{}"); + JSONObject event = new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); + // when eventPublisher.sendEvent(event, STREAM_ID); // then - verify(cambriaPublisher).send("MyPartitionKey", event.toString()); + verify(cambriaPublisher).send("dns01cmd004", event.toString()); } + @Test public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { // given JSONObject event = new JSONObject( - "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); + "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); // when eventPublisher.sendEvent(event, STREAM_ID); // then - verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); + verify(cambriaPublisher).send("dns01cmd004", new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}").toString()); } @Test @@ -86,4 +89,4 @@ public class DMaaPEventPublisherTest { // then verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); } -}
\ No newline at end of file +} diff --git a/version.properties b/version.properties index 24828c2f..cedaba1f 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=7 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |