From bcec586b8261ee99285973fd2640a6cc4c25451b Mon Sep 17 00:00:00 2001 From: Zlatko Murgoski Date: Mon, 31 Dec 2018 11:29:52 +0100 Subject: Fix bug with processing event list DMaapEventPublisher processing of eventList Change-Id: I3aa31dd55aefdc5130fcc36712f20bd15c3f7cce Issue-ID: DCAEGEN2-1035 Signed-off-by: Zlatko Murgoski --- pom.xml | 2 +- .../java/org/onap/dcae/commonFunction/EventProcessor.java | 11 +++++------ .../org/onap/dcae/commonFunction/EventProcessorTest.java | 13 ------------- version.properties | 2 +- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index d3503969..81e409aa 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ limitations under the License. org.onap.dcaegen2.collectors.ves VESCollector - 1.3.1-SNAPSHOT + 1.3.2-SNAPSHOT dcaegen2-collectors-ves VESCollector diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 7d27399d..0d150c51 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -50,7 +50,6 @@ public class EventProcessor implements Runnable { private static final String COMMON_EVENT_HEADER = "commonEventHeader"; private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - public JSONObject event; private EventPublisher eventPublisher; private Map streamidHash; private ApplicationSettings properties; @@ -66,7 +65,7 @@ public class EventProcessor implements Runnable { public void run() { try { while (true) { - event = VesApplication.fProcessingInputQueue.take(); + JSONObject event = VesApplication.fProcessingInputQueue.take(); // As long as the producer is running we remove elements from // the queue. log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); @@ -81,7 +80,7 @@ public class EventProcessor implements Runnable { .onEmpty(() -> { log.error("No StreamID defined for publish - Message dropped" + event); }).forEach(streamIds -> { - sendEventsToStreams(streamIds); + sendEventsToStreams(event, streamIds); }); log.debug("Message published" + event); } @@ -91,7 +90,7 @@ public class EventProcessor implements Runnable { } } - public void overrideEvent() { + public void overrideEvent(JSONObject event) { // Set collector timestamp in event payload before publish addCurrentTimeToEvent(event); @@ -112,10 +111,10 @@ public class EventProcessor implements Runnable { log.debug("Modified event:" + event); } - private void sendEventsToStreams(String[] streamIdList) { + private void sendEventsToStreams(JSONObject event, String[] streamIdList) { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); - this.overrideEvent(); + overrideEvent(event); eventPublisher.sendEvent(event, aStreamIdList); } } diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java index 697510c4..5641a927 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java @@ -62,19 +62,6 @@ public class EventProcessorTest { properties = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine); } - @Test - public void testLoad() { - //given - EventProcessor ec = new EventProcessor(mock(EventPublisher.class), properties); - ec.event = new org.json.JSONObject(ev); - //when - ec.overrideEvent(); - - //then - Boolean hasSourceNameNode = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName"); - assertTrue(hasSourceNameNode); - } - @Test public void shouldParseJsonEvents() throws ReflectiveOperationException { //given diff --git a/version.properties b/version.properties index fee49286..ef20baaf 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=3 -patch=1 +patch=2 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg