From 87f0b004fb0b15f3e8fa30d39bdf8ae3310b8743 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Thu, 4 May 2023 11:24:29 +0100 Subject: DMI Data AVC to use kafka headers - POC done keeping AvcEvent schema in mind. - Approach to have header schema per event schema. - Moved the header information from AvcEvent to separate AvcEventHeader schema. - Added Jsr303 annotation support for required field check Issue-ID: CPS-1671 Change-Id: I2e4f969e8ca4f6282d1b9aa5fd52d16174a26084 Signed-off-by: mpriyank --- .../cps/ncmp/api/impl/events/EventsPublisher.java | 34 +++++++++++++++---- .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 37 ++++++++++++++------ .../ncmp/api/impl/events/avc/AvcEventMapper.java | 11 +----- .../impl/events/avc/AvcEventConsumerSpec.groovy | 39 ++++++++++++++-------- .../src/test/resources/sampleAvcInputEvent.json | 7 ---- 5 files changed, 82 insertions(+), 46 deletions(-) (limited to 'cps-ncmp-service') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbaee..4c84629304 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -40,17 +42,36 @@ public class EventsPublisher { private final KafkaTemplate eventKafkaTemplate; /** - * LCM Event publisher. + * Generic Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ + @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); + } - eventFuture.addCallback(new ListenableFutureCallback<>() { + private ListenableFutureCallback> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); @@ -61,6 +82,7 @@ public class EventsPublisher { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } - }); + }; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 83ad5e5704..3bf02f0b58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,14 +20,19 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,16 +52,28 @@ public class AvcEventConsumer { /** - * Consume the specified event. + * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEvent the event to be consumed and produced. + * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener( - topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) - public void consumeAndForward(final AvcEvent avcEvent) { - log.debug("Consuming AVC event {} ...", avcEvent); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent); - eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent); + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + final String mutatedEventId = UUID.randomUUID().toString(); + mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); + eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), + outgoingAvcEvent); + } + + private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String existingEventId = + (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); + eventHeaders.remove("eventId"); + log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, + mutatedEventId); + eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java index 113da0deb9..8246ed4802 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java @@ -20,11 +20,8 @@ package org.onap.cps.ncmp.api.impl.events.avc; -import java.util.UUID; import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; /** @@ -33,12 +30,6 @@ import org.onap.cps.ncmp.event.model.AvcEvent; @Mapper(componentModel = "spring") public interface AvcEventMapper { - @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId") AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - @Named("avcEventId") - static String getAvcEventId(String eventId) { - return UUID.randomUUID().toString(); - } - } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index d57527a454..5f54bbe3dd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* - * ============LICENSE_START======================================================= - * Copyright (c) 2023 Nordix Foundation. + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,17 +21,20 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.header.internals.RecordHeader import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.event.model.AvcEvent +import org.onap.cps.ncmp.events.avc.v1.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext +import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -63,23 +66,33 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + and: 'event has header information' + def consumerRecord = new ConsumerRecord(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) + consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) + consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) when: 'the event is consumed' - acvEventConsumer.consumeAndForward(testEventSent) + acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent) - and: 'consumed forwarded NCMP event id differs from DMI event id' - assert testEventSent.eventId != convertedAvcEvent.getEventId() - and: 'correlation id matches' - assert testEventSent.eventCorrelationId == convertedAvcEvent.getEventCorrelationId() - and: 'timestamps match' - assert testEventSent.eventTime == convertedAvcEvent.getEventTime() - and: 'target matches' - assert testEventSent.eventSource == convertedAvcEvent.getEventSource() + def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + and: 'we have correct headers forwarded where correlation id matches' + record.headers().forEach(header -> { + if (header.key().equals('eventCorrelationId')) { + assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' + } + }) + and: 'event id differs(as per requirement) between consumed and forwarded' + record.headers().forEach(header -> { + if (header.key().equals('eventId')) { + assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' + } + }) + and: 'the event payload still matches' + assert testEventSent == convertedAvcEvent } } \ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index bda2b4e638..de8a523c0f 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,11 +1,4 @@ { - "eventId": "4cb32729-85e3-44d1-aa6e-c923b9b059a5", - "eventCorrelationId": "68f15800-8ed4-4bae-9e53-27a9e03e1911", - "eventTime": "2022-12-12T14:29:23.876+0000", - "eventSource": "NCMP", - "eventType": "org.onap.cps.ncmp.event.model.AvcEvent", - "eventSchema": "urn:cps:org.onap.cps.ncmp.event.model.AvcEvent", - "eventSchemaVersion": "v1", "event": { "payload": "Hello world!" } -- cgit 1.2.3-korg