diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 4c9064852..49e455e58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events; import io.cloudevents.CloudEvent; import java.util.Map; +import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; @@ -31,8 +32,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; /** * EventsPublisher to publish events. @@ -61,9 +60,17 @@ public class EventsPublisher<T> { * @param event message payload */ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { - final ListenableFuture<SendResult<String, CloudEvent>> eventFuture - = cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture<SendResult<String, CloudEvent>> eventFuture = + cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } + }); } /** @@ -76,9 +83,16 @@ public class EventsPublisher<T> { */ @Deprecated(forRemoval = true) public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture<SendResult<String, T>> eventFuture - = legacyKafkaEventTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture<SendResult<String, T>> eventFuture = + legacyKafkaEventTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } + }); } /** @@ -93,8 +107,16 @@ public class EventsPublisher<T> { final ProducerRecord<String, T> producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture<SendResult<String, T>> eventFuture = + legacyKafkaEventTemplate.send(producerRecord); + eventFuture.whenComplete((result, ex) -> { + if (ex != null) { + log.error("Unable to publish event to topic : {} due to {}", topicName, ex.getMessage()); + } else { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } + }); } /** @@ -111,21 +133,6 @@ public class EventsPublisher<T> { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) { - return new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable throwable) { - log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); - } - - @Override - public void onSuccess(final SendResult<String, ?> sendResult) { - log.debug("Successfully published event to topic : {} , Event : {}", - sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); - } - }; - } - private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) { final Headers eventHeaders = new RecordHeaders(); eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); |