From 87f0b004fb0b15f3e8fa30d39bdf8ae3310b8743 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Thu, 4 May 2023 11:24:29 +0100 Subject: DMI Data AVC to use kafka headers - POC done keeping AvcEvent schema in mind. - Approach to have header schema per event schema. - Moved the header information from AvcEvent to separate AvcEventHeader schema. - Added Jsr303 annotation support for required field check Issue-ID: CPS-1671 Change-Id: I2e4f969e8ca4f6282d1b9aa5fd52d16174a26084 Signed-off-by: mpriyank --- cps-dependencies/pom.xml | 5 ++ cps-ncmp-events/pom.xml | 5 ++ .../resources/schemas/avc-event-schema-v1.json | 60 ---------------------- .../schemas/dmidataavc/avc-event-headers-v1.json | 50 ++++++++++++++++++ .../schemas/dmidataavc/avc-event-schema-v1.json | 24 +++++++++ .../cps/ncmp/api/impl/events/EventsPublisher.java | 34 +++++++++--- .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 37 +++++++++---- .../ncmp/api/impl/events/avc/AvcEventMapper.java | 11 +--- .../impl/events/avc/AvcEventConsumerSpec.groovy | 39 +++++++++----- .../src/test/resources/sampleAvcInputEvent.json | 7 --- 10 files changed, 166 insertions(+), 106 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/avc-event-schema-v1.json create mode 100644 cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json create mode 100644 cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json diff --git a/cps-dependencies/pom.xml b/cps-dependencies/pom.xml index 70501420a1..e06bbd7a1f 100755 --- a/cps-dependencies/pom.xml +++ b/cps-dependencies/pom.xml @@ -209,6 +209,11 @@ guava 31.1-jre + + javax.validation + validation-api + 2.0.1.Final + diff --git a/cps-ncmp-events/pom.xml b/cps-ncmp-events/pom.xml index 7457eb6e0f..52ca77e936 100644 --- a/cps-ncmp-events/pom.xml +++ b/cps-ncmp-events/pom.xml @@ -35,6 +35,10 @@ com.fasterxml.jackson.core jackson-databind + + javax.validation + validation-api + @@ -47,6 +51,7 @@ org.onap.cps.ncmp.event.model true true + true diff --git a/cps-ncmp-events/src/main/resources/schemas/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/avc-event-schema-v1.json deleted file mode 100644 index 0e9f256639..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/avc-event-schema-v1.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1", - "$ref": "#/definitions/AvcEvent", - "definitions": { - "AvcEvent": { - "description": "The payload for AVC event.", - "type": "object", - "properties": { - "eventId": { - "description": "The unique id identifying the event generated by DMI for this AVC event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id passed by NCMP for this AVC event.", - "type": "string" - }, - "eventTime": { - "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", - "type": "string" - }, - "eventSource": { - "description": "The source of the AVC event.", - "type": "string" - }, - "eventType": { - "description": "The type of the AVC event.", - "type": "string" - }, - "eventSchema": { - "description": "The event schema for AVC events.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The event schema version for AVC events.", - "type": "string" - }, - "event": { - "$ref": "#/definitions/Event" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventTime", - "eventSource", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - }, - "Event": { - "description": "The AVC event content.", - "type": "object", - "existingJavaType": "java.lang.Object", - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json new file mode 100644 index 0000000000..caae82bb23 --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json @@ -0,0 +1,50 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-headers-schema:v1", + "$ref": "#/definitions/AvcEventHeader", + "definitions": { + "AvcEventHeader": { + "description": "The header for AVC event.", + "type": "object", + "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader", + "properties": { + "eventId": { + "description": "The unique id identifying the event generated by DMI for this AVC event.", + "type": "string" + }, + "eventCorrelationId": { + "description": "The request id passed by NCMP for this AVC event.", + "type": "string" + }, + "eventTime": { + "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", + "type": "string" + }, + "eventSource": { + "description": "The source of the AVC event.", + "type": "string" + }, + "eventType": { + "description": "The type of the AVC event.", + "type": "string" + }, + "eventSchema": { + "description": "The event schema for AVC events.", + "type": "string" + }, + "eventSchemaVersion": { + "description": "The event schema version for AVC events.", + "type": "string" + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventType", + "eventSchema", + "eventSchemaVersion" + ], + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json new file mode 100644 index 0000000000..407551f4fd --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json @@ -0,0 +1,24 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1", + "$ref": "#/definitions/AvcEvent", + "definitions": { + "AvcEvent": { + "description": "The payload for AVC event.", + "type": "object", + "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEvent", + "properties": { + "event": { + "description": "The AVC event content.", + "type": "object", + "existingJavaType": "java.lang.Object", + "additionalProperties": false + } + }, + "required": [ + "event" + ], + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbaee..4c84629304 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -40,17 +42,36 @@ public class EventsPublisher { private final KafkaTemplate eventKafkaTemplate; /** - * LCM Event publisher. + * Generic Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ + @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); + } - eventFuture.addCallback(new ListenableFutureCallback<>() { + private ListenableFutureCallback> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); @@ -61,6 +82,7 @@ public class EventsPublisher { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } - }); + }; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 83ad5e5704..3bf02f0b58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,14 +20,19 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,16 +52,28 @@ public class AvcEventConsumer { /** - * Consume the specified event. + * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEvent the event to be consumed and produced. + * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener( - topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) - public void consumeAndForward(final AvcEvent avcEvent) { - log.debug("Consuming AVC event {} ...", avcEvent); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent); - eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent); + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + final String mutatedEventId = UUID.randomUUID().toString(); + mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); + eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), + outgoingAvcEvent); + } + + private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String existingEventId = + (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); + eventHeaders.remove("eventId"); + log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, + mutatedEventId); + eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java index 113da0deb9..8246ed4802 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java @@ -20,11 +20,8 @@ package org.onap.cps.ncmp.api.impl.events.avc; -import java.util.UUID; import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; /** @@ -33,12 +30,6 @@ import org.onap.cps.ncmp.event.model.AvcEvent; @Mapper(componentModel = "spring") public interface AvcEventMapper { - @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId") AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - @Named("avcEventId") - static String getAvcEventId(String eventId) { - return UUID.randomUUID().toString(); - } - } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index d57527a454..5f54bbe3dd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* - * ============LICENSE_START======================================================= - * Copyright (c) 2023 Nordix Foundation. + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,17 +21,20 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.header.internals.RecordHeader import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.event.model.AvcEvent +import org.onap.cps.ncmp.events.avc.v1.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext +import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -63,23 +66,33 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + and: 'event has header information' + def consumerRecord = new ConsumerRecord(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) + consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) + consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) when: 'the event is consumed' - acvEventConsumer.consumeAndForward(testEventSent) + acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent) - and: 'consumed forwarded NCMP event id differs from DMI event id' - assert testEventSent.eventId != convertedAvcEvent.getEventId() - and: 'correlation id matches' - assert testEventSent.eventCorrelationId == convertedAvcEvent.getEventCorrelationId() - and: 'timestamps match' - assert testEventSent.eventTime == convertedAvcEvent.getEventTime() - and: 'target matches' - assert testEventSent.eventSource == convertedAvcEvent.getEventSource() + def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + and: 'we have correct headers forwarded where correlation id matches' + record.headers().forEach(header -> { + if (header.key().equals('eventCorrelationId')) { + assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' + } + }) + and: 'event id differs(as per requirement) between consumed and forwarded' + record.headers().forEach(header -> { + if (header.key().equals('eventId')) { + assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' + } + }) + and: 'the event payload still matches' + assert testEventSent == convertedAvcEvent } } \ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index bda2b4e638..de8a523c0f 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,11 +1,4 @@ { - "eventId": "4cb32729-85e3-44d1-aa6e-c923b9b059a5", - "eventCorrelationId": "68f15800-8ed4-4bae-9e53-27a9e03e1911", - "eventTime": "2022-12-12T14:29:23.876+0000", - "eventSource": "NCMP", - "eventType": "org.onap.cps.ncmp.event.model.AvcEvent", - "eventSchema": "urn:cps:org.onap.cps.ncmp.event.model.AvcEvent", - "eventSchemaVersion": "v1", "event": { "payload": "Hello world!" } -- cgit 1.2.3-korg