diff options
Diffstat (limited to 'src/main')
3 files changed, 95 insertions, 73 deletions
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 1ec918c9..81c463dc 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -49,17 +49,17 @@ public class EventSender { setLoggingContext(vesEvent); streamIdToDmaapIds.get(vesEvent.getStreamId()) .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(streamIds -> sendEventsToStreams(vesEvent, streamIds)); + .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); log.debug("Message published" + vesEvent.asJsonObject()); } log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); metriclog.info("EVENT_PUBLISH_END"); } - private void sendEventsToStreams(VesEvent vesEvent, String[] streamIdList) { - for (String streamId : streamIdList) { - log.info("Invoking publisher for streamId/domain:" + streamId); - eventPublisher.sendEvent(vesEvent.asJsonObject(), streamId); + private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { + for (String dmaapId : dmaapIds) { + log.info("Invoking publisher for streamId/domain:" + dmaapId); + eventPublisher.sendEvent(vesEvent, dmaapId); } } diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java index 6c9a8ee2..8e2db80e 100644 --- a/src/main/java/org/onap/dcae/common/model/VesEvent.java +++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java @@ -32,14 +32,15 @@ import org.json.JSONObject; */ public class VesEvent { - private static final String EVENT_LITERAL = "event"; + public static final String VES_UNIQUE_ID = "VESuniqueId"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final String DOMAIN = "domain"; private static final String STND_DEFINED_NAMESPACE = "stndDefinedNamespace"; private static final String STND_DEFINED_DOMAIN = "stndDefined"; private static final String STND_DEFINED_FIELDS = "stndDefinedFields"; private static final String SCHEMA_REFERENCE = "schemaReference"; + private static final String EVENT = "event"; + private static final String PARTITION_KEY = "sourceName"; private final JSONObject event; @@ -47,6 +48,45 @@ public class VesEvent { this.event = event; } + public JsonNode asJsonNode() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readTree(event.toString()); + } + + /** + * Returns VES event in form of JSON object. + * + * @return event in form of json Object + */ + public JSONObject asJsonObject() { + return new JSONObject(event.toString()); + } + + /** + * Returns Domain name from VES event. + * + * @return domain + */ + public String getDomain() { + return getEventHeader().getString(DOMAIN); + } + + /** + * Returns event primary key. + * @return a primary key + */ + public String getPK() { + return event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); + } + + /** + * Returns schema reference. + * @return a schema reference. + */ + public String getSchemaReference() { + return getStndDefinedFields().getString(SCHEMA_REFERENCE); + } + /** * Returns stream ID from VES event. * @@ -63,21 +103,40 @@ public class VesEvent { } /** - * Returns Domain name from VES event. + * Returns unique ID of VES event. * - * @return domain + * @return unique ID */ - public String getDomain() { - return getEventHeader().getString(DOMAIN); + public Object getUniqueId() { + return event.get(VES_UNIQUE_ID); } - public String getSchemaReference() { - return getStndDefinedFields().getString(SCHEMA_REFERENCE); + /** + * Checks if type of event is same as given in paramaters. + * + * @param type name that will be compared with event type + * @return true or false depending if type given in parameter is same as VES event type + */ + public boolean hasType(String type) { + return this.event.has(type); + } + + /** + * Remove Json element from event by key. + * @param key + */ + public void removeElement(String key) { + this.event.remove(key); + } + + @Override + public String toString() { + return event.toString(); } private JSONObject getStndDefinedFields() { return event - .getJSONObject(EVENT_LITERAL) + .getJSONObject(EVENT) .getJSONObject(STND_DEFINED_FIELDS); } @@ -97,44 +156,11 @@ public class VesEvent { private JSONObject getEventHeader() { return event - .getJSONObject(EVENT_LITERAL) + .getJSONObject(EVENT) .getJSONObject(COMMON_EVENT_HEADER); } private boolean isStdDefinedDomain(String domain) { return domain.equals(STND_DEFINED_DOMAIN); } - - /** - * Returns unique ID of VES event. - * - * @return unique ID - */ - public Object getUniqueId() { - return event.get(VES_UNIQUE_ID); - } - - /** - * Returns VES event in form of JSON object. - * - * @return event in form of json Object - */ - public JSONObject asJsonObject() { - return new JSONObject(event.toString()); - } - - public JsonNode asJsonNode() throws JsonProcessingException { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readTree(event.toString()); - } - - /** - * Checks if type of event is same as given in paramaters. - * - * @param type name that will be compared with event type - * @return true or false depending if type given in parameter is same as VES event type - */ - public boolean hasType(String type) { - return this.event.has(type); - } } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 4e9aabc7..876c391b 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -27,8 +27,8 @@ import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import org.json.JSONObject; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.model.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +41,6 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; */ public class DMaaPEventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final String EVENT = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); @@ -57,45 +53,45 @@ public class DMaaPEventPublisher { this(new DMaaPPublishersCache(dMaaPConfig)); } - public void sendEvent(JSONObject event, String domain) { - clearVesUniqueIdFromEvent(event); - publishersCache.getPublisher(domain) + public void sendEvent(VesEvent vesEvent, String dmaapId){ + clearVesUniqueIdFromEvent(vesEvent); + publishersCache.getPublisher(dmaapId) .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) + .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); } - private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) + .onFailure(exc -> closePublisher(event, dmaapId, exc)); } - private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) throws IOException { - - String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); - int pendingMsgs = publisher.send(pk, event.toString()); + + String pk = event.getPK(); + int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); log.info(infoMsg); outputLogger.info(infoMsg); } - private void closePublisher(JSONObject event, String domain, Throwable e) { + private void closePublisher(VesEvent event, String dmaapId, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); - publishersCache.closePublisherFor(domain); + event, dmaapId), e); + publishersCache.closePublisherFor(dmaapId); } - private void clearVesUniqueIdFromEvent(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); + private void clearVesUniqueIdFromEvent(VesEvent event) { + if (event.hasType(VesEvent.VES_UNIQUE_ID)) { + String uuid = event.getUniqueId().toString(); LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); + event.removeElement(VesEvent.VES_UNIQUE_ID); } } } |