aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java46
1 files changed, 21 insertions, 25 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 4e9aabc7..876c391b 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -27,8 +27,8 @@ import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
import io.vavr.control.Try;
-import org.json.JSONObject;
import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.model.VesEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +41,6 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
*/
public class DMaaPEventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
- private static final String VES_UNIQUE_ID = "VESuniqueId";
- private static final String EVENT = "event";
- private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- private static final String PARTITION_KEY = "sourceName";
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
@@ -57,45 +53,45 @@ public class DMaaPEventPublisher {
this(new DMaaPPublishersCache(dMaaPConfig));
}
- public void sendEvent(JSONObject event, String domain) {
- clearVesUniqueIdFromEvent(event);
- publishersCache.getPublisher(domain)
+ public void sendEvent(VesEvent vesEvent, String dmaapId){
+ clearVesUniqueIdFromEvent(vesEvent);
+ publishersCache.getPublisher(dmaapId)
.onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
- .forEach(publisher -> sendEvent(event, domain, publisher));
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
+ .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
}
- private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
- Try.run(() -> uncheckedSendEvent(event, domain, publisher))
- .onFailure(exc -> closePublisher(event, domain, exc));
+ private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
+ .onFailure(exc -> closePublisher(event, dmaapId, exc));
}
- private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
throws IOException {
-
- String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
- int pendingMsgs = publisher.send(pk, event.toString());
+
+ String pk = event.getPK();
+ int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
}
- String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
log.info(infoMsg);
outputLogger.info(infoMsg);
}
- private void closePublisher(JSONObject event, String domain, Throwable e) {
+ private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, domain), e);
- publishersCache.closePublisherFor(domain);
+ event, dmaapId), e);
+ publishersCache.closePublisherFor(dmaapId);
}
- private void clearVesUniqueIdFromEvent(JSONObject event) {
- if (event.has(VES_UNIQUE_ID)) {
- String uuid = event.get(VES_UNIQUE_ID).toString();
+ private void clearVesUniqueIdFromEvent(VesEvent event) {
+ if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
+ String uuid = event.getUniqueId().toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
log.debug("Removing VESuniqueid object from event");
- event.remove(VES_UNIQUE_ID);
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
}
}
}