summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/EventProcessor.java')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java43
1 files changed, 22 insertions, 21 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index a57ea3f0..7d27399d 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -25,7 +25,10 @@ import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
+import io.vavr.collection.Map;
import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.VesApplication;
import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,37 +38,38 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-class EventProcessor implements Runnable {
+public class EventProcessor implements Runnable {
+ static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {
+ }.getType();
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final String EVENT_LITERAL = "event";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
private EventPublisher eventPublisher;
+ private Map<String, String[]> streamidHash;
+ private ApplicationSettings properties;
- public EventProcessor(EventPublisher eventPublisher) {
+
+ public EventProcessor(EventPublisher eventPublisher, ApplicationSettings properties) {
this.eventPublisher = eventPublisher;
- streamidHash = CommonStartup.streamID.toJavaMap();
+ this.properties = properties;
+ this.streamidHash = properties.dMaaPStreamsMapping();
}
@Override
public void run() {
try {
while (true) {
- event = CommonStartup.fProcessingInputQueue.take();
+ event = VesApplication.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
- log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+ log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
String uuid = event.get("VESuniqueId").toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
@@ -73,14 +77,12 @@ class EventProcessor implements Runnable {
String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain);
- String[] streamIdList = streamidHash.get(domain);
- log.debug("streamIdList:" + Arrays.toString(streamIdList));
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event);
- } else {
- sendEventsToStreams(streamIdList);
- }
+ streamidHash.get(domain)
+ .onEmpty(() -> {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ }).forEach(streamIds -> {
+ sendEventsToStreams(streamIds);
+ });
log.debug("Message published" + event);
}
} catch (InterruptedException e) {
@@ -93,7 +95,7 @@ class EventProcessor implements Runnable {
// Set collector timestamp in event payload before publish
addCurrentTimeToEvent(event);
- if (CommonStartup.eventTransformFlag) {
+ if (properties.eventTransformingEnabled()) {
// read the mapping json file
try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
log.info("parse eventTransform.json");
@@ -168,5 +170,4 @@ class EventProcessor implements Runnable {
method.invoke(configProcessors, parameter);
}
}
-}
-
+} \ No newline at end of file