summaryrefslogtreecommitdiffstats
path: root/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java')
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java39
1 files changed, 12 insertions, 27 deletions
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
index c5be6cc0..34255431 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
@@ -21,47 +21,32 @@
package org.onap.dcaegen2.kpi.dmaap;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+
import java.io.IOException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.gson.JsonPrimitive;
-import reactor.core.publisher.Flux;
/**
* Produces Notification on DMAAP events.
*/
public class NotificationProducer {
- private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class);
- private MessageRouterPublisher messageRouterPublisher;
- private MessageRouterPublishRequest messageRouterPublishRequest;
-
+
+ private CambriaBatchingPublisher cambriaBatchingPublisher;
+
/**
* Parameterized constructor.
*/
- public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) {
+ public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
super();
- this.messageRouterPublisher = messageRouterPublisher;
- this.messageRouterPublishRequest = messageRouterPublishRequest;
+ this.cambriaBatchingPublisher = cambriaBatchingPublisher;
}
/**
* sends notification to dmaap.
*/
- public void sendNotification(String msg) throws IOException {
- Flux.just(1, 2, 3)
- .map(JsonPrimitive::new)
- .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input))
- .subscribe(resp -> {
- if (resp.successful()) {
- logger.debug("Sent a batch of messages to the MR");
- } else {
- logger.warn("Message sending has failed: {}", resp.failReason());
- }
- },
- ex -> {
- logger.warn("An unexpected error while sending messages to DMaaP", ex);
- });
+ public int sendNotification(String msg) throws IOException {
+
+ return cambriaBatchingPublisher.send("", msg);
+
}
+
}