From 87f0b004fb0b15f3e8fa30d39bdf8ae3310b8743 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Thu, 4 May 2023 11:24:29 +0100 Subject: DMI Data AVC to use kafka headers - POC done keeping AvcEvent schema in mind. - Approach to have header schema per event schema. - Moved the header information from AvcEvent to separate AvcEventHeader schema. - Added Jsr303 annotation support for required field check Issue-ID: CPS-1671 Change-Id: I2e4f969e8ca4f6282d1b9aa5fd52d16174a26084 Signed-off-by: mpriyank --- .../cps/ncmp/api/impl/events/EventsPublisher.java | 34 ++++++++++++++++---- .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 37 ++++++++++++++++------ .../ncmp/api/impl/events/avc/AvcEventMapper.java | 11 +------ 3 files changed, 56 insertions(+), 26 deletions(-) (limited to 'cps-ncmp-service/src/main') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbae..4c8462930 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -40,17 +42,36 @@ public class EventsPublisher { private final KafkaTemplate eventKafkaTemplate; /** - * LCM Event publisher. + * Generic Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ + @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); + } - eventFuture.addCallback(new ListenableFutureCallback<>() { + private ListenableFutureCallback> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); @@ -61,6 +82,7 @@ public class EventsPublisher { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } - }); + }; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 83ad5e570..3bf02f0b5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,14 +20,19 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,16 +52,28 @@ public class AvcEventConsumer { /** - * Consume the specified event. + * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEvent the event to be consumed and produced. + * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener( - topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) - public void consumeAndForward(final AvcEvent avcEvent) { - log.debug("Consuming AVC event {} ...", avcEvent); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent); - eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent); + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + final String mutatedEventId = UUID.randomUUID().toString(); + mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); + eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), + outgoingAvcEvent); + } + + private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String existingEventId = + (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); + eventHeaders.remove("eventId"); + log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, + mutatedEventId); + eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java index 113da0deb..8246ed480 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java @@ -20,11 +20,8 @@ package org.onap.cps.ncmp.api.impl.events.avc; -import java.util.UUID; import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; /** @@ -33,12 +30,6 @@ import org.onap.cps.ncmp.event.model.AvcEvent; @Mapper(componentModel = "spring") public interface AvcEventMapper { - @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId") AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - @Named("avcEventId") - static String getAvcEventId(String eventId) { - return UUID.randomUUID().toString(); - } - } -- cgit 1.2.3-korg