aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java19
1 files changed, 10 insertions, 9 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
index fd9b3ae1..a0ee3bfb 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
@@ -21,20 +21,21 @@
package org.onap.dcae.commonFunction.event.publishing;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
import io.vavr.control.Try;
-import java.io.IOException;
import org.json.JSONObject;
import org.onap.dcae.commonFunction.VESLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher {
public void sendEvent(JSONObject event, String domain) {
clearVesUniqueIdFromEvent(event);
publishersCache.getPublisher(domain)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
- .forEach(publisher -> sendEvent(event, domain, publisher));
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
}
@Override
@@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher {
private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
Try.run(() -> uncheckedSendEvent(event, domain, publisher))
- .onFailure(exc -> closePublisher(event, domain, exc));
+ .onFailure(exc -> closePublisher(event, domain, exc));
}
private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
- throws IOException {
+ throws IOException {
int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
@@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher {
private void closePublisher(JSONObject event, String domain, Throwable e) {
log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, domain), e);
+ event, domain), e);
publishersCache.closePublisherFor(domain);
}