diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java | 46 |
1 files changed, 21 insertions, 25 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 4e9aabc7..876c391b 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -27,8 +27,8 @@ import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import org.json.JSONObject; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.model.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +41,6 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; */ public class DMaaPEventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final String EVENT = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); @@ -57,45 +53,45 @@ public class DMaaPEventPublisher { this(new DMaaPPublishersCache(dMaaPConfig)); } - public void sendEvent(JSONObject event, String domain) { - clearVesUniqueIdFromEvent(event); - publishersCache.getPublisher(domain) + public void sendEvent(VesEvent vesEvent, String dmaapId){ + clearVesUniqueIdFromEvent(vesEvent); + publishersCache.getPublisher(dmaapId) .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) + .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); } - private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) + .onFailure(exc -> closePublisher(event, dmaapId, exc)); } - private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) throws IOException { - - String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); - int pendingMsgs = publisher.send(pk, event.toString()); + + String pk = event.getPK(); + int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); log.info(infoMsg); outputLogger.info(infoMsg); } - private void closePublisher(JSONObject event, String domain, Throwable e) { + private void closePublisher(VesEvent event, String dmaapId, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); - publishersCache.closePublisherFor(domain); + event, dmaapId), e); + publishersCache.closePublisherFor(dmaapId); } - private void clearVesUniqueIdFromEvent(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); + private void clearVesUniqueIdFromEvent(VesEvent event) { + if (event.hasType(VesEvent.VES_UNIQUE_ID)) { + String uuid = event.getUniqueId().toString(); LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); + event.removeElement(VesEvent.VES_UNIQUE_ID); } } } |