summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java11
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java13
-rw-r--r--version.properties2
4 files changed, 7 insertions, 21 deletions
diff --git a/pom.xml b/pom.xml
index d3503969..81e409aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ limitations under the License.
</parent>
<groupId>org.onap.dcaegen2.collectors.ves</groupId>
<artifactId>VESCollector</artifactId>
- <version>1.3.1-SNAPSHOT</version>
+ <version>1.3.2-SNAPSHOT</version>
<name>dcaegen2-collectors-ves</name>
<description>VESCollector</description>
<properties>
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 7d27399d..0d150c51 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -50,7 +50,6 @@ public class EventProcessor implements Runnable {
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- public JSONObject event;
private EventPublisher eventPublisher;
private Map<String, String[]> streamidHash;
private ApplicationSettings properties;
@@ -66,7 +65,7 @@ public class EventProcessor implements Runnable {
public void run() {
try {
while (true) {
- event = VesApplication.fProcessingInputQueue.take();
+ JSONObject event = VesApplication.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
@@ -81,7 +80,7 @@ public class EventProcessor implements Runnable {
.onEmpty(() -> {
log.error("No StreamID defined for publish - Message dropped" + event);
}).forEach(streamIds -> {
- sendEventsToStreams(streamIds);
+ sendEventsToStreams(event, streamIds);
});
log.debug("Message published" + event);
}
@@ -91,7 +90,7 @@ public class EventProcessor implements Runnable {
}
}
- public void overrideEvent() {
+ public void overrideEvent(JSONObject event) {
// Set collector timestamp in event payload before publish
addCurrentTimeToEvent(event);
@@ -112,10 +111,10 @@ public class EventProcessor implements Runnable {
log.debug("Modified event:" + event);
}
- private void sendEventsToStreams(String[] streamIdList) {
+ private void sendEventsToStreams(JSONObject event, String[] streamIdList) {
for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
- this.overrideEvent();
+ overrideEvent(event);
eventPublisher.sendEvent(event, aStreamIdList);
}
}
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
index 697510c4..5641a927 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
@@ -63,19 +63,6 @@ public class EventProcessorTest {
}
@Test
- public void testLoad() {
- //given
- EventProcessor ec = new EventProcessor(mock(EventPublisher.class), properties);
- ec.event = new org.json.JSONObject(ev);
- //when
- ec.overrideEvent();
-
- //then
- Boolean hasSourceNameNode = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
- assertTrue(hasSourceNameNode);
- }
-
- @Test
public void shouldParseJsonEvents() throws ReflectiveOperationException {
//given
EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class), properties);
diff --git a/version.properties b/version.properties
index fee49286..ef20baaf 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=1
minor=3
-patch=1
+patch=2
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT