diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common')
-rw-r--r-- | src/main/java/org/onap/dcae/common/EventProcessor.java | 61 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/common/EventSender.java | 94 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/common/EventUpdater.java | 137 |
3 files changed, 161 insertions, 131 deletions
diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java deleted file mode 100644 index bf3bf70d..00000000 --- a/src/main/java/org/onap/dcae/common/EventProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import org.json.JSONObject;
-import org.onap.dcae.VesApplication;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventProcessor implements Runnable {
-
- private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
- private EventSender eventSender;
-
- public EventProcessor(EventSender eventSender) {
- this.eventSender = eventSender;
- }
-
- @Override
- public void run() {
- try {
- while (true){
- JSONObject event = VesApplication.fProcessingInputQueue.take();
- log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
- setLoggingContext(event);
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event));
- eventSender.send(event);
- log.debug("Message published" + event);
- }
- } catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
- Thread.currentThread().interrupt();
- }
- }
-
- private void setLoggingContext(JSONObject event) {
- LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString());
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- }
-}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 48268d6c..c1002af6 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -20,17 +20,12 @@ */ package org.onap.dcae.common; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.publishing.EventPublisher; import org.slf4j.Logger; @@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory; public class EventSender { - private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private Map<String, String[]> streamidHash; - private ApplicationSettings properties; private EventPublisher eventPublisher; - - private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType(); + private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final Logger log = LoggerFactory.getLogger(EventSender.class); private static final String EVENT_LITERAL = "event"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { this.eventPublisher = eventPublisher; this.streamidHash = properties.dMaaPStreamsMapping(); - this.properties = properties; - } - public void send(JSONObject event) { - streamidHash.get(getDomain(event)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) - .forEach(streamIds -> sendEventsToStreams(event, streamIds)); + public void send(JSONArray arrayOfEvents) { + for (int i = 0; i < arrayOfEvents.length(); i++) { + metriclog.info("EVENT_PUBLISH_START"); + JSONObject object = (JSONObject) arrayOfEvents.get(i); + setLoggingContext(object); + streamidHash.get(getDomain(object)) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object)) + .forEach(streamIds -> sendEventsToStreams(object, streamIds)); + log.debug("Message published" + object); + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + metriclog.info("EVENT_PUBLISH_END"); } - public static String getDomain(JSONObject event) { + private static String getDomain(JSONObject event) { return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); } private void sendEventsToStreams(JSONObject event, String[] streamIdList) { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); } } - private JSONObject overrideEvent(JSONObject event) { - JSONObject jsonObject = addCurrentTimeToEvent(event); - if (properties.eventTransformingEnabled()) { - try (FileReader fr = new FileReader("./etc/eventTransform.json")) { - log.info("parse eventTransform.json"); - List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE); - parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); - } catch (IOException e) { - log.error(COULD_NOT_FIND_FILE, e); - throw new ApplicationException(COULD_NOT_FIND_FILE, e); - } - } - if (jsonObject.has("VESversion")) - jsonObject.remove("VESversion"); - - log.debug("Modified event:" + jsonObject); - return jsonObject; - } - - private JSONObject addCurrentTimeToEvent(JSONObject event) { - final Date currentTime = new Date(); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - return event; - } - - private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { - for (Event eventTransform : eventsTransform) { - JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); - if (configProcessorAdapter.isFilterMet(filterObj)) { - callProcessorsMethod(configProcessorAdapter, eventTransform.processors); - } - } - } - - private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) { - for (Processor processor : processors) { - final String functionName = processor.functionName; - final JSONObject args = new JSONObject(processor.args.toString()); - log.info(String.format("functionName==%s | args==%s", functionName, args)); - try { - configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); - } catch (ReflectiveOperationException e) { - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } - } + private void setLoggingContext(JSONObject event) { + LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event)); } } diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java new file mode 100644 index 00000000..1caa4f18 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventUpdater.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventUpdater { + + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String VES_VERSION = "VESversion"; + private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType(); + private static final Logger log = LoggerFactory.getLogger(EventSender.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String EVENT_TRANSFORM = "./etc/eventTransform.json"; + private ApplicationSettings settings; + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + public EventUpdater(ApplicationSettings settings) { + this.settings = settings; + } + + public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){ + if(type.equalsIgnoreCase(EVENT_LIST)){ + return convertEvents(jsonObject, uuid.toString(), version); + } + else { + return convertEvent(jsonObject, uuid.toString(), version); + } + } + + private JSONArray convertEvents(JSONObject jsonObject, + String uuid, String version) { + JSONArray asArrayEvents = new JSONArray(); + + JSONArray events = jsonObject.getJSONArray(EVENT_LIST); + for (int i = 0; i < events.length(); i++) { + JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i)); + event.put(VES_UNIQUE_ID, uuid + "-" + i); + event.put(VES_VERSION, version); + asArrayEvents.put(overrideEvent(event)); + } + return asArrayEvents; + } + + private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) { + jsonObject.put(VES_UNIQUE_ID, uuid); + jsonObject.put(VES_VERSION, version); + return new JSONArray().put(overrideEvent(jsonObject)); + } + + private JSONObject overrideEvent(JSONObject event) { + JSONObject jsonObject = addCurrentTimeToEvent(event); + if (settings.eventTransformingEnabled()) { + try (FileReader fr = new FileReader(EVENT_TRANSFORM)) { + log.info("parse " + EVENT_TRANSFORM + " file"); + List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); + } catch (IOException e) { + log.error(COULD_NOT_FIND_FILE, e); + throw new ApplicationException(COULD_NOT_FIND_FILE, e); + } + } + if (jsonObject.has(VES_VERSION)) + jsonObject.remove(VES_VERSION); + log.debug("Modified event:" + jsonObject); + return jsonObject; + } + + private JSONObject addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + return event; + } + + private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) { + for (Processor processor : processors) { + //TODO try to remove refection + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + log.info(String.format("functionName==%s | args==%s", functionName, args)); + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } +} |