summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java23
1 files changed, 23 insertions, 0 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 4c8462930..b0b091a2f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -20,13 +20,16 @@
package org.onap.cps.ncmp.api.impl.events;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -70,6 +73,20 @@ public class EventsPublisher<T> {
eventFuture.addCallback(handleCallback(topicName));
}
+ /**
+ * Generic Event Publisher with headers.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param eventHeaders map of event headers
+ * @param event message payload
+ */
+ public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+ final T event) {
+
+ publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+ }
+
private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
@@ -85,4 +102,10 @@ public class EventsPublisher<T> {
};
}
+ private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
+ final Headers eventHeaders = new RecordHeaders();
+ eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
+ return eventHeaders;
+ }
+
}