aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/test
diff options
context:
space:
mode:
authorsourabh_sourabh <sourabh.sourabh@est.tech>2023-06-19 17:35:15 +0100
committersourabh_sourabh <sourabh.sourabh@est.tech>2023-06-21 13:34:43 +0100
commitdd4bcaadf2d5b5420669764951ab97eff6ffa4f5 (patch)
treefebeb81466c2e9ec68ae1b3b787fc29a57bcfa65 /cps-ncmp-service/src/test
parent43e3e06f795a46c392004af1acb97db3b2f2cfb6 (diff)
Patch # 3: Data operation response event (NCMP → Client App) to comply with CloudEvents
- Modified data operation record strategy to consume cloud event. - Modified NCMP data operation event consumer to read cloud event header. (prefixed with ce_) - Modified event publisher to support legacy and cloud event based on event type (if legacy event use legacy kafka template else cloud kafka template). - Introduced a new method onto json object mapper to convert json object to bytes. - Modified data operation consumer spec to produce a cloud event and validate it. - Added Kafka Integration Test (for filtering) Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech> Change-Id: Ide701b1ff952f57413cb4e4aa0d55c08753f0298 Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/test')
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy61
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy79
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy5
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy1
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml2
-rw-r--r--cps-ncmp-service/src/test/resources/dataOperationEvent.json33
6 files changed, 130 insertions, 51 deletions
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
new file mode 100644
index 0000000000..3db8520c29
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
@@ -0,0 +1,61 @@
+package org.onap.cps.ncmp.api.impl.async
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.kafka.CloudEventSerializer
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.test.utils.ContainerTestUtils
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+import java.util.concurrent.TimeUnit
+
+@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy])
+@DirtiesContext
+@Testcontainers
+@EnableAutoConfiguration
+class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
+
+ @Autowired
+ private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
+
+ @SpringBean
+ EventsPublisher mockEventsPublisher = Mock()
+
+ def activateListeners() {
+ kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+ messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+ )
+ }
+
+ def 'Filtering Events.'() {
+ given: 'a cloud event of type: #eventType'
+ def cloudEvent = CloudEventBuilder.v1().withId("some-uuid")
+ .withType(eventType)
+ .withSource(URI.create("sample-test-source"))
+ .build();
+ and: 'activate message listener container'
+ activateListeners()
+ when: 'send the cloud event'
+ ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent)
+ KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
+ producer.send(record);
+ and: 'wait a little for async processing of message'
+ TimeUnit.MILLISECONDS.sleep(100)
+ then: 'the event has only been forwarded for the correct type'
+ expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _)
+ where:
+ eventType || expectedNUmberOfCallsToPublishForwardedEvent
+ 'DataOperationEvent' || 1
+ 'other type' || 0
+ 'any type contain the word "DataOperationEvent"' || 1
+ }
+}
+
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
index d9b9ce6db0..7f8469aafc 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
@@ -21,11 +21,16 @@
package org.onap.cps.ncmp.api.impl.async
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
@@ -45,41 +50,56 @@ import java.time.Duration
class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+ NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+ RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
- def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+ @Autowired
+ ObjectMapper objectMapper
+
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
- legacyEventKafkaConsumer.subscribe([clientTopic])
+ cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for data operation event'
def consumerRecordIn = createConsumerRecord(dataOperationType)
when: 'the data operation event is consumed and published to client specified topic'
- asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+ objectUnderTest.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
- def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
- then: 'verifying consumed event operationID is same as published event operationID'
- def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
- def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
- assert operationIdIn == operationIdOut
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ then: 'verify cloud compliant headers'
+ def consumerRecordOutHeaders = consumerRecordOut.headers()
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+ and: 'verify that extension is included into header'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+ and: 'map consumer record to expected event type'
+ def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+ PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ and: 'verify published response data properties'
+ def response = dataOperationResponseEvent.data.responses[0]
+ response.operationId == 'some-operation-id'
+ response.statusCode == 'any-success-status-code'
+ response.statusMessage == 'Successfully applied changes'
+ response.responseContent as String == '[some-key:some-value]'
}
def 'Filter an event with type #eventType'() {
given: 'consumer record for event with type #eventType'
def consumerRecord = createConsumerRecord(eventType)
when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
- def result = recordFilterStrategy.filter(consumerRecord)
+ def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
then: 'the event is #description'
assert result == expectedResult
where: 'filter the event based on the eventType #eventType'
@@ -90,14 +110,27 @@ class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
def createConsumerRecord(eventTypeAsString) {
def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
- def eventTarget = SerializationUtils.serialize(clientTopic)
- def eventType = SerializationUtils.serialize(eventTypeAsString)
- def eventId = SerializationUtils.serialize('12345')
- def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
- consumerRecord.headers().add(new RecordHeader('eventId', eventId))
- consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
- consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+ def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+ CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+ def headers = new RecordHeaders()
+ def cloudEventSerializer = new CloudEventSerializer()
+ cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+ headers.forEach(header -> consumerRecord.headers().add(header))
return consumerRecord
}
+
+ def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+ return CloudEventBuilder.v1()
+ .withId("some-uuid")
+ .withType(eventTypeAsString)
+ .withSource(URI.create("sample-test-source"))
+ .withData(testEventSentAsBytes)
+ .withExtension("correlationid", "request-id")
+ .withExtension("destination", clientTopic)
+ .build();
+ }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 4a9e3ee811..5cc70e2809 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
-
import java.time.Duration
@SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -85,8 +84,8 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
assert records.size() == 1
and: 'record can be converted to AVC event'
def record = records.iterator().next()
- def cloudevent = record.value() as CloudEvent
- def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+ def cloudEvent = record.value() as CloudEvent
+ def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
and: 'we have correct headers forwarded where correlation id matches'
assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id differs(as per requirement) between consumed and forwarded'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
index 3ae6348e96..7bdf335e3d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
@@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.inventory
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
-
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 197bfda19c..df34f844d2 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -26,6 +26,8 @@ spring:
app:
ncmp:
+ async-m2m:
+ topic: ncmp-async-m2m
avc:
subscription-topic: cm-avc-subscription
cm-events-topic: cm-events
diff --git a/cps-ncmp-service/src/test/resources/dataOperationEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
index 42268c0ef3..0a32f38c0a 100644
--- a/cps-ncmp-service/src/test/resources/dataOperationEvent.json
+++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
@@ -1,30 +1,15 @@
{
- "data":{
- "responses":[
+ "data": {
+ "responses": [
{
- "operationId":"1",
- "ids":[
- "123",
- "124"
+ "operationId": "some-operation-id",
+ "ids": [
+ "cm-handle-id"
],
- "statusCode":1,
- "statusMessage":"Batch operation success on the above cmhandle ids ",
- "responseContent":{
- "ietf-netconf-monitoring:netconf-state":{
- "schemas":{
- "schema":[
- {
- "identifier":"ietf-tls-server",
- "version":"2016-11-02",
- "format":"ietf-netconf-monitoring:yang",
- "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server",
- "location":[
- "NETCONF"
- ]
- }
- ]
- }
- }
+ "statusCode": "any-success-status-code",
+ "statusMessage": "Successfully applied changes",
+ "responseContent": {
+ "some-key": "some-value"
}
}
]