diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java | 56 |
1 files changed, 24 insertions, 32 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 58d0d2d48..4c9064852 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.impl.events; import io.cloudevents.CloudEvent; import java.util.Map; -import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; @@ -32,6 +31,8 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; /** * EventsPublisher to publish events. @@ -60,17 +61,9 @@ public class EventsPublisher<T> { * @param event message payload */ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { - final CompletableFuture<SendResult<String, CloudEvent>> eventFuture = - cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> { - if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", - result.getRecordMetadata().topic(), result.getProducerRecord().value()); - - } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); - } - }); + final ListenableFuture<SendResult<String, CloudEvent>> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); } /** @@ -83,16 +76,9 @@ public class EventsPublisher<T> { */ @Deprecated(forRemoval = true) public void publishEvent(final String topicName, final String eventKey, final T event) { - final CompletableFuture<SendResult<String, T>> eventFuture = - legacyKafkaEventTemplate.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> { - if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", - result.getRecordMetadata().topic(), result.getProducerRecord().value()); - } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); - } - }); + final ListenableFuture<SendResult<String, T>> eventFuture + = legacyKafkaEventTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); } /** @@ -107,16 +93,8 @@ public class EventsPublisher<T> { final ProducerRecord<String, T> producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final CompletableFuture<SendResult<String, T>> eventFuture = - legacyKafkaEventTemplate.send(producerRecord); - eventFuture.whenComplete((result, ex) -> { - if (ex != null) { - log.error("Unable to publish event to topic : {} due to {}", topicName, ex.getMessage()); - } else { - log.debug("Successfully published event to topic : {} , Event : {}", - result.getRecordMetadata().topic(), result.getProducerRecord().value()); - } - }); + final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); } /** @@ -133,6 +111,20 @@ public class EventsPublisher<T> { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } + private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { + @Override + public void onFailure(final Throwable throwable) { + log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); + } + + @Override + public void onSuccess(final SendResult<String, ?> sendResult) { + log.debug("Successfully published event to topic : {} , Event : {}", + sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); + } + }; + } private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) { final Headers eventHeaders = new RecordHeaders(); |