diff options
author | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-07-31 08:18:03 +0200 |
---|---|---|
committer | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-08-01 09:56:00 +0200 |
commit | fc073344d4c0eb8a28bf34c07a8439176cf846ca (patch) | |
tree | 01f5b4789c3d9369eaebb54a9f910a9fa400af1f /src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java | |
parent | d12cd3525284cc41414d8fdae09e2ffbc03a1fbb (diff) |
Replace nsaCore library with Spring
Change-Id: I2227939a67a2cbba2d392136d49ef4419600d186
Issue-ID: DCAEGEN2-602
Signed-off-by: PawelSzalapski <pawel.szalapski@nokia.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java index fd9b3ae1..a0ee3bfb 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java @@ -21,20 +21,21 @@ package org.onap.dcae.commonFunction.event.publishing; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import java.io.IOException; import org.json.JSONObject; import org.onap.dcae.commonFunction.VESLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher { public void sendEvent(JSONObject event, String domain) { clearVesUniqueIdFromEvent(event); publishersCache.getPublisher(domain) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); } @Override @@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher { private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + .onFailure(exc -> closePublisher(event, domain, exc)); } private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) - throws IOException { + throws IOException { int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); @@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher { private void closePublisher(JSONObject event, String domain, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); + event, domain), e); publishersCache.closePublisherFor(domain); } |