diff options
author | Toine Siebelink <toine.siebelink@est.tech> | 2023-05-10 14:20:09 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2023-05-10 14:20:09 +0000 |
commit | e626c9661fd88a585b50dafab5f5542784690143 (patch) | |
tree | 87fcad6f6708553e72d27323a34711cb7ce80f96 /cps-ncmp-service/src/main/java | |
parent | 9b829443ebf53885730c3786287568742c33582e (diff) | |
parent | 87f0b004fb0b15f3e8fa30d39bdf8ae3310b8743 (diff) |
Merge "DMI Data AVC to use kafka headers"
Diffstat (limited to 'cps-ncmp-service/src/main/java')
3 files changed, 56 insertions, 26 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbaee..4c84629304 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -40,17 +42,36 @@ public class EventsPublisher<T> { private final KafkaTemplate<String, T> eventKafkaTemplate; /** - * LCM Event publisher. + * Generic Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ + @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture<SendResult<String, T>> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord<String, T> producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); + } - eventFuture.addCallback(new ListenableFutureCallback<>() { + private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); @@ -61,6 +82,7 @@ public class EventsPublisher<T> { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } - }); + }; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 83ad5e5704..3bf02f0b58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,14 +20,19 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,16 +52,28 @@ public class AvcEventConsumer { /** - * Consume the specified event. + * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEvent the event to be consumed and produced. + * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener( - topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) - public void consumeAndForward(final AvcEvent avcEvent) { - log.debug("Consuming AVC event {} ...", avcEvent); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent); - eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent); + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) + public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) { + log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + final String mutatedEventId = UUID.randomUUID().toString(); + mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); + eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), + outgoingAvcEvent); + } + + private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String existingEventId = + (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); + eventHeaders.remove("eventId"); + log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, + mutatedEventId); + eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java index 113da0deb9..8246ed4802 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java @@ -20,11 +20,8 @@ package org.onap.cps.ncmp.api.impl.events.avc; -import java.util.UUID; import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; /** @@ -33,12 +30,6 @@ import org.onap.cps.ncmp.event.model.AvcEvent; @Mapper(componentModel = "spring") public interface AvcEventMapper { - @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId") AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - @Named("avcEventId") - static String getAvcEventId(String eventId) { - return UUID.randomUUID().toString(); - } - } |