From 3032261ec743bde6eb136cd2030774c3dc358fa9 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Fri, 16 Jun 2023 15:55:52 +0100 Subject: DMI Data AVC to cloud events - DMI Data AVC to be consumed as CloudEvents now - Removed the schema header as it is taken care by cloudevent headers - Implemented naming and packaging comments on the schema - Test cases refactoring Issue-ID: CPS-1719 Change-Id: I9864f90446720fe903fb3c1502d86035d886751d Signed-off-by: mpriyank --- .../schemas/dmidataavc/avc-event-header-v1.json | 50 ---------- .../schemas/dmidataavc/avc-event-schema-1.0.0.json | 106 ++++++++++++++++++++ .../schemas/dmidataavc/avc-event-schema-v1.json | 107 --------------------- .../cps/ncmp/api/impl/events/EventsPublisher.java | 17 +++- .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 35 ++----- .../ncmp/api/impl/events/avc/AvcEventMapper.java | 35 ------- .../impl/events/avc/AvcEventConsumerSpec.groovy | 53 +++++----- .../src/test/resources/sampleAvcInputEvent.json | 2 +- 8 files changed, 157 insertions(+), 248 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json create mode 100644 cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json delete mode 100644 cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json deleted file mode 100644 index ea1e617c8..000000000 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-header-schema:v1", - "$ref": "#/definitions/AvcEventHeader", - "definitions": { - "AvcEventHeader": { - "description": "The header for AVC event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader", - "properties": { - "eventId": { - "description": "The unique id identifying the event generated by DMI for this AVC event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id passed by NCMP for this AVC event.", - "type": "string" - }, - "eventTime": { - "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", - "type": "string" - }, - "eventSource": { - "description": "The source of the AVC event.", - "type": "string" - }, - "eventType": { - "description": "The type of the AVC event.", - "type": "string" - }, - "eventSchema": { - "description": "The event schema for AVC events.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The event schema version for AVC events.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json new file mode 100644 index 000000000..a5bed939b --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json @@ -0,0 +1,106 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:1.0.0", + "$ref": "#/definitions/AvcEvent", + "definitions": { + "Edit": { + "additionalProperties": false, + "properties": { + "edit-id": { + "type": "string" + }, + "operation": { + "type": "string" + }, + "target": { + "type": "string" + }, + "value": { + "$ref": "#/definitions/Value" + } + }, + "required": [ + "edit-id", + "operation", + "target" + ] + }, + "Value": { + "type": "object", + "additionalProperties": false, + "properties": { + "attributes": { + "type": "array", + "items": { + "type": "object", + "existingJavaType": "java.util.Map", + "additionalProperties": false, + "properties": { + "isHoAllowed": { + "type": "boolean" + } + } + } + } + } + }, + "AvcEvent": { + "description": "The payload for AVC event.", + "type": "object", + "javaType": "org.onap.cps.ncmp.events.avc1_0_0.AvcEvent", + "properties": { + "data": { + "description": "The AVC event content compliant with RFC8641 format", + "type": "object", + "additionalProperties": false, + "properties": { + "push-change-update": { + "type": "object", + "additionalProperties": false, + "properties": { + "datastore-changes": { + "type": "object", + "additionalProperties": false, + "properties": { + "ietf-yang-patch:yang-patch": { + "type": "object", + "additionalProperties": false, + "properties": { + "patch-id": { + "type": "string" + }, + "edit": { + "type": "array", + "items": { + "$ref": "#/definitions/Edit" + } + } + }, + "required": [ + "patch-id", + "edit" + ] + } + }, + "required": [ + "ietf-yang-patch:yang-patch" + ] + } + }, + "required": [ + "datastore-changes" + ] + } + }, + "required": [ + "push-change-update" + ] + } + }, + "required": [ + "data" + ], + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json deleted file mode 100644 index 7e975c9b9..000000000 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json +++ /dev/null @@ -1,107 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1", - "$ref": "#/definitions/AvcEvent", - "definitions": { - "Edit": { - "javaType": "org.onap.cps.ncmp.events.avc.v1.Edit", - "additionalProperties": false, - "properties": { - "edit-id": { - "type": "string" - }, - "operation": { - "type": "string" - }, - "target": { - "type": "string" - }, - "value": { - "$ref": "#/definitions/Value" - } - }, - "required": [ - "edit-id", - "operation", - "target" - ] - }, - "Value": { - "type": "object", - "additionalProperties": false, - "properties": { - "attributes": { - "type": "array", - "items": { - "type": "object", - "existingJavaType": "java.util.Map", - "additionalProperties": false, - "properties": { - "isHoAllowed": { - "type": "boolean" - } - } - } - } - } - }, - "AvcEvent": { - "description": "The payload for AVC event.", - "type": "object", - "javaType": "org.onap.cps.ncmp.events.avc.v1.AvcEvent", - "properties": { - "event": { - "description": "The AVC event content compliant with RFC8641 format", - "type": "object", - "additionalProperties": false, - "properties": { - "push-change-update": { - "type": "object", - "additionalProperties": false, - "properties": { - "datastore-changes": { - "type": "object", - "additionalProperties": false, - "properties": { - "ietf-yang-patch:yang-patch": { - "type": "object", - "additionalProperties": false, - "properties": { - "patch-id": { - "type": "string" - }, - "edit": { - "type": "array", - "items": { - "$ref": "#/definitions/Edit" - } - } - }, - "required": [ - "patch-id", - "edit" - ] - } - }, - "required": [ - "ietf-yang-patch:yang-patch" - ] - } - }, - "required": [ - "datastore-changes" - ] - } - }, - "required": [ - "push-change-update" - ] - } - }, - "required": [ - "event" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 7b28b4cd5..e61e7729b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -50,6 +50,19 @@ public class EventsPublisher { private final KafkaTemplate cloudEventKafkaTemplate; + /** + * Generic CloudEvent publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final ListenableFuture> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + /** * Generic Event publisher. * @@ -95,7 +108,7 @@ public class EventsPublisher { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback> handleCallback(final String topicName) { + private ListenableFutureCallback> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { @@ -103,7 +116,7 @@ public class EventsPublisher { } @Override - public void onSuccess(final SendResult sendResult) { + public void onSuccess(final SendResult sendResult) { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index f37497abe..b5ca176d1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,19 +20,17 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,34 +45,19 @@ public class AvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher eventsPublisher; - private final AvcEventMapper avcEventMapper; - + private final EventsPublisher eventsPublisher; /** * Incoming AvcEvent in the form of Consumer Record. * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) - public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + @KafkaListener(topics = "${app.dmi.cm-events.topic}") + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); - final String mutatedEventId = UUID.randomUUID().toString(); - mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); - eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), - outgoingAvcEvent); - } - - private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { - final String eventId = "eventId"; - final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); - eventHeaders.remove(eventId); - log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, - mutatedEventId); - eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); - + final String newEventId = UUID.randomUUID().toString(); + final CloudEvent outgoingAvcEvent = + CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java deleted file mode 100644 index 8246ed480..000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avc; - -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; - - -/** - * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. - */ -@Mapper(componentModel = "spring") -public interface AvcEventMapper { - - AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 3dffac714..4a9e3ee81 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -21,21 +21,23 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.avc.v1.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext -import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -46,52 +48,49 @@ import java.time.Duration class AvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class) + EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper) + AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher) @Autowired JsonObjectMapper jsonObjectMapper - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription on a topic' def cmEventsTopicName = 'cm-events' acvEventConsumer.cmEventsTopicName = cmEventsTopicName - legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List) + cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('sample-eventid') + .withType('sample-test-type') + .withSource(URI.create('sample-test-source')) + .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) - consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) + def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) when: 'the event is consumed' acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' - def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) + def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + def cloudevent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' - record.headers().forEach(header -> { - if (header.key().equals('eventCorrelationId')) { - assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' - record.headers().forEach(header -> { - if (header.key().equals('eventId')) { - assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid' and: 'the event payload still matches' assert testEventSent == convertedAvcEvent } diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index 569343fed..5b297c86c 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,5 +1,5 @@ { - "event":{ + "data":{ "push-change-update":{ "datastore-changes":{ "ietf-yang-patch:yang-patch":{ -- cgit 1.2.3-korg