From 3032261ec743bde6eb136cd2030774c3dc358fa9 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Fri, 16 Jun 2023 15:55:52 +0100 Subject: DMI Data AVC to cloud events - DMI Data AVC to be consumed as CloudEvents now - Removed the schema header as it is taken care by cloudevent headers - Implemented naming and packaging comments on the schema - Test cases refactoring Issue-ID: CPS-1719 Change-Id: I9864f90446720fe903fb3c1502d86035d886751d Signed-off-by: mpriyank --- .../cps/ncmp/api/impl/events/EventsPublisher.java | 17 ++++++- .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 35 ++++---------- .../ncmp/api/impl/events/avc/AvcEventMapper.java | 35 -------------- .../impl/events/avc/AvcEventConsumerSpec.groovy | 53 +++++++++++----------- .../src/test/resources/sampleAvcInputEvent.json | 2 +- 5 files changed, 51 insertions(+), 91 deletions(-) delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java (limited to 'cps-ncmp-service') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 7b28b4cd5f..e61e7729be 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -50,6 +50,19 @@ public class EventsPublisher { private final KafkaTemplate cloudEventKafkaTemplate; + /** + * Generic CloudEvent publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final ListenableFuture> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + /** * Generic Event publisher. * @@ -95,7 +108,7 @@ public class EventsPublisher { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback> handleCallback(final String topicName) { + private ListenableFutureCallback> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { @@ -103,7 +116,7 @@ public class EventsPublisher { } @Override - public void onSuccess(final SendResult sendResult) { + public void onSuccess(final SendResult sendResult) { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index f37497abe6..b5ca176d1d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,19 +20,17 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,34 +45,19 @@ public class AvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher eventsPublisher; - private final AvcEventMapper avcEventMapper; - + private final EventsPublisher eventsPublisher; /** * Incoming AvcEvent in the form of Consumer Record. * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) - public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + @KafkaListener(topics = "${app.dmi.cm-events.topic}") + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); - final String mutatedEventId = UUID.randomUUID().toString(); - mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); - eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), - outgoingAvcEvent); - } - - private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { - final String eventId = "eventId"; - final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); - eventHeaders.remove(eventId); - log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, - mutatedEventId); - eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); - + final String newEventId = UUID.randomUUID().toString(); + final CloudEvent outgoingAvcEvent = + CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java deleted file mode 100644 index 8246ed4802..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avc; - -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; - - -/** - * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. - */ -@Mapper(componentModel = "spring") -public interface AvcEventMapper { - - AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 3dffac714b..4a9e3ee811 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -21,21 +21,23 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.avc.v1.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext -import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -46,52 +48,49 @@ import java.time.Duration class AvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class) + EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper) + AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher) @Autowired JsonObjectMapper jsonObjectMapper - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription on a topic' def cmEventsTopicName = 'cm-events' acvEventConsumer.cmEventsTopicName = cmEventsTopicName - legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List) + cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('sample-eventid') + .withType('sample-test-type') + .withSource(URI.create('sample-test-source')) + .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) - consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) + def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) when: 'the event is consumed' acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' - def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) + def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + def cloudevent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' - record.headers().forEach(header -> { - if (header.key().equals('eventCorrelationId')) { - assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' - record.headers().forEach(header -> { - if (header.key().equals('eventId')) { - assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid' and: 'the event payload still matches' assert testEventSent == convertedAvcEvent } diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index 569343fed9..5b297c86c2 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,5 +1,5 @@ { - "event":{ + "data":{ "push-change-update":{ "datastore-changes":{ "ietf-yang-patch:yang-patch":{ -- cgit 1.2.3-korg