From dd4bcaadf2d5b5420669764951ab97eff6ffa4f5 Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Mon, 19 Jun 2023 17:35:15 +0100 Subject: Patch # 3: Data operation response event (NCMP → Client App) to comply with CloudEvents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Modified data operation record strategy to consume cloud event. - Modified NCMP data operation event consumer to read cloud event header. (prefixed with ce_) - Modified event publisher to support legacy and cloud event based on event type (if legacy event use legacy kafka template else cloud kafka template). - Introduced a new method onto json object mapper to convert json object to bytes. - Modified data operation consumer spec to produce a cloud event and validate it. - Added Kafka Integration Test (for filtering) Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh Change-Id: Ide701b1ff952f57413cb4e4aa0d55c08753f0298 Signed-off-by: sourabh_sourabh --- ...ataOperationEventConsumerIntegrationSpec.groovy | 61 +++++++++++++++++ .../NcmpAsyncDataOperationEventConsumerSpec.groovy | 79 +++++++++++++++------- .../impl/events/avc/AvcEventConsumerSpec.groovy | 5 +- .../ncmp/api/inventory/CompositeStateSpec.groovy | 1 - 4 files changed, 119 insertions(+), 27 deletions(-) create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy (limited to 'cps-ncmp-service/src/test/groovy') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy new file mode 100644 index 0000000000..3db8520c29 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy @@ -0,0 +1,61 @@ +package org.onap.cps.ncmp.api.impl.async + +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.kafka.CloudEventSerializer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.onap.cps.ncmp.api.impl.events.EventsPublisher +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.config.KafkaListenerEndpointRegistry +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers +import java.util.concurrent.TimeUnit + +@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy]) +@DirtiesContext +@Testcontainers +@EnableAutoConfiguration +class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec { + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry + + @SpringBean + EventsPublisher mockEventsPublisher = Mock() + + def activateListeners() { + kafkaListenerEndpointRegistry.getListenerContainers().forEach( + messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) } + ) + } + + def 'Filtering Events.'() { + given: 'a cloud event of type: #eventType' + def cloudEvent = CloudEventBuilder.v1().withId("some-uuid") + .withType(eventType) + .withSource(URI.create("sample-test-source")) + .build(); + and: 'activate message listener container' + activateListeners() + when: 'send the cloud event' + ProducerRecord record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent) + KafkaProducer producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer)) + producer.send(record); + and: 'wait a little for async processing of message' + TimeUnit.MILLISECONDS.sleep(100) + then: 'the event has only been forwarded for the correct type' + expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _) + where: + eventType || expectedNUmberOfCallsToPublishForwardedEvent + 'DataOperationEvent' || 1 + 'other type' || 0 + 'any type contain the word "DataOperationEvent"' || 1 + } +} + diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy index d9b9ce6db0..7f8469aafc 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -21,11 +21,16 @@ package org.onap.cps.ncmp.api.impl.async import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.commons.lang3.SerializationUtils +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer +import io.cloudevents.kafka.impl.KafkaHeaders +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.header.internals.RecordHeaders import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent @@ -45,41 +50,56 @@ import java.time.Duration class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) + NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) @Autowired JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy recordFilterStrategy + RecordFilterStrategy dataOperationRecordFilterStrategy - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer)) def static clientTopic = 'client-topic' def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' - legacyEventKafkaConsumer.subscribe([clientTopic]) + cloudEventKafkaConsumer.subscribe([clientTopic]) and: 'consumer record for data operation event' def consumerRecordIn = createConsumerRecord(dataOperationType) when: 'the data operation event is consumed and published to client specified topic' - asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) + objectUnderTest.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' - def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] - then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.data.responses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId - assert operationIdIn == operationIdOut + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + then: 'verify cloud compliant headers' + def consumerRecordOutHeaders = consumerRecordOut.headers() + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType + and: 'verify that extension is included into header' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic + and: 'map consumer record to expected event type' + def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(), + PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue() + and: 'verify published response data properties' + def response = dataOperationResponseEvent.data.responses[0] + response.operationId == 'some-operation-id' + response.statusCode == 'any-success-status-code' + response.statusMessage == 'Successfully applied changes' + response.responseContent as String == '[some-key:some-value]' } def 'Filter an event with type #eventType'() { given: 'consumer record for event with type #eventType' def consumerRecord = createConsumerRecord(eventType) when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' - def result = recordFilterStrategy.filter(consumerRecord) + def result = dataOperationRecordFilterStrategy.filter(consumerRecord) then: 'the event is #description' assert result == expectedResult where: 'filter the event based on the eventType #eventType' @@ -90,14 +110,27 @@ class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { def createConsumerRecord(eventTypeAsString) { def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) - def eventTarget = SerializationUtils.serialize(clientTopic) - def eventType = SerializationUtils.serialize(eventTypeAsString) - def eventId = SerializationUtils.serialize('12345') - def consumerRecord = new ConsumerRecord(clientTopic, 1, 1L, '123', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', eventId)) - consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) - consumerRecord.headers().add(new RecordHeader('eventType', eventType)) + def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)) + + CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes) + + def headers = new RecordHeaders() + def cloudEventSerializer = new CloudEventSerializer() + cloudEventSerializer.serialize(clientTopic, headers, cloudEvent) + + def consumerRecord = new ConsumerRecord(clientTopic, 0, 0L, 'sample-message-key', cloudEvent) + headers.forEach(header -> consumerRecord.headers().add(header)) return consumerRecord } + + def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) { + return CloudEventBuilder.v1() + .withId("some-uuid") + .withType(eventTypeAsString) + .withSource(URI.create("sample-test-source")) + .withData(testEventSentAsBytes) + .withExtension("correlationid", "request-id") + .withExtension("destination", clientTopic) + .build(); + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 4a9e3ee811..5cc70e2809 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper]) @@ -85,8 +84,8 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def cloudevent = record.value() as CloudEvent - def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() + def cloudEvent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy index 3ae6348e96..7bdf335e3d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy @@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.inventory import com.fasterxml.jackson.databind.ObjectMapper import spock.lang.Specification - import java.time.OffsetDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter -- cgit 1.2.3-korg