summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java21
1 files changed, 15 insertions, 6 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
index ce362019..55f24a65 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020-2021 Wipro Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +22,11 @@
package org.onap.slice.analysis.ms.dmaap;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.gson.JsonPrimitive;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
import java.io.IOException;
@@ -30,21 +35,25 @@ import java.io.IOException;
*/
public class NotificationProducer {
- private CambriaBatchingPublisher cambriaBatchingPublisher;
+ private MessageRouterPublisher publisher;
+ private MessageRouterPublishRequest request;
/**
* Parameterized constructor.
*/
- public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
+ public NotificationProducer(MessageRouterPublisher publisher, MessageRouterPublishRequest request) {
super();
- this.cambriaBatchingPublisher = cambriaBatchingPublisher;
+ this.publisher = publisher;
+ this.request = request;
}
/**
* sends notification to dmaap.
*/
- public int sendNotification(String msg) throws IOException {
- return cambriaBatchingPublisher.send("", msg);
+ public void sendNotification(String msg) throws IOException {
+ Flux<JsonPrimitive> singleMessage = Flux.just(msg).map(JsonPrimitive::new);
+ Flux<MessageRouterPublishResponse> result = this.publisher.put(request, singleMessage);
+ result.then().block();
}
}