aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/EventProcessor.java')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java16
1 files changed, 5 insertions, 11 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 6dd2f5c8..06a27328 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -41,7 +41,7 @@ import java.util.HashMap;
import java.util.List;
-public class EventProcessor implements Runnable {
+class EventProcessor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final String EVENT_LITERAL = "event";
@@ -53,8 +53,8 @@ public class EventProcessor implements Runnable {
static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
- public EventProcessor() {
- streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamid);
+ EventProcessor() {
+ streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamID);
}
private Map<String, String[]> parseStreamIdToStreamHashMapping(String streamId) {
@@ -63,7 +63,6 @@ public class EventProcessor implements Runnable {
for (String aList : list) {
String domain = aList.split("=")[0];
String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
-
streamidHash.put(domain, streamIdList);
}
return streamidHash;
@@ -72,12 +71,9 @@ public class EventProcessor implements Runnable {
@Override
public void run() {
-
try {
-
- event = CommonStartup.fProcessingInputQueue.take();
-
- while (event != null) {
+ while (true) {
+ event = CommonStartup.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
@@ -97,8 +93,6 @@ public class EventProcessor implements Runnable {
sendEventsToStreams(streamIdList);
}
log.debug("Message published" + event);
- event = CommonStartup.fProcessingInputQueue.take();
-
}
} catch (InterruptedException e) {
log.error("EventProcessor InterruptedException" + e.getMessage());