diff options
Diffstat (limited to 'components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java')
-rw-r--r-- | components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java | 39 |
1 files changed, 12 insertions, 27 deletions
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java index c5be6cc0..34255431 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java @@ -21,47 +21,32 @@ package org.onap.dcaegen2.kpi.dmaap; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; + import java.io.IOException; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.gson.JsonPrimitive; -import reactor.core.publisher.Flux; /** * Produces Notification on DMAAP events. */ public class NotificationProducer { - private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class); - private MessageRouterPublisher messageRouterPublisher; - private MessageRouterPublishRequest messageRouterPublishRequest; - + + private CambriaBatchingPublisher cambriaBatchingPublisher; + /** * Parameterized constructor. */ - public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) { + public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) { super(); - this.messageRouterPublisher = messageRouterPublisher; - this.messageRouterPublishRequest = messageRouterPublishRequest; + this.cambriaBatchingPublisher = cambriaBatchingPublisher; } /** * sends notification to dmaap. */ - public void sendNotification(String msg) throws IOException { - Flux.just(1, 2, 3) - .map(JsonPrimitive::new) - .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input)) - .subscribe(resp -> { - if (resp.successful()) { - logger.debug("Sent a batch of messages to the MR"); - } else { - logger.warn("Message sending has failed: {}", resp.failReason()); - } - }, - ex -> { - logger.warn("An unexpected error while sending messages to DMaaP", ex); - }); + public int sendNotification(String msg) throws IOException { + + return cambriaBatchingPublisher.send("", msg); + } + } |