diff options
10 files changed, 151 insertions, 74 deletions
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml index 19ef988d30..608141ebfd 100644 --- a/cps-ncmp-service/pom.xml +++ b/cps-ncmp-service/pom.xml @@ -104,6 +104,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java index 9e2b66a2c1..ce666b1099 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -20,9 +20,8 @@ package org.onap.cps.ncmp.api.impl.async; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.impl.KafkaHeaders; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -41,15 +40,11 @@ public class DataOperationRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() { + public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() { return consumedRecord -> { - final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); - if (eventTypeHeader == null) { - return false; - } - final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); - return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.contains("DataOperationEvent")); + final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader( + consumedRecord.headers(), "ce_type"); + return !(eventTypeHeaderValue.contains("DataOperationEvent")); }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 995a4d5a67..4a0ec5c493 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -20,12 +20,12 @@ package org.onap.cps.ncmp.api.impl.async; +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.impl.KafkaHeaders; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -39,7 +39,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher<DataOperationEvent> eventsPublisher; + private final EventsPublisher<CloudEvent> eventsPublisher; /** * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' @@ -52,14 +52,12 @@ public class NcmpAsyncDataOperationEventConsumer { filter = "includeDataOperationEventsOnly", groupId = "ncmp-data-operation-event-group", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) - public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent> - dataOperationEventConsumerRecord) { + public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) { log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); - final String eventTarget = SerializationUtils - .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); - final String eventId = SerializationUtils - .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), - dataOperationEventConsumerRecord.value()); + final String eventTarget = KafkaHeaders.getParsedKafkaHeader( + dataOperationEventConsumerRecord.headers(), "ce_destination"); + final String eventId = KafkaHeaders.getParsedKafkaHeader( + dataOperationEventConsumerRecord.headers(), "ce_id"); + eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index e61e7729be..05c731d946 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -71,7 +71,7 @@ public class EventsPublisher<T> { * @param event message payload * @deprecated This method is not needed anymore since the use of headers will be in place. */ - @Deprecated + @Deprecated(forRemoval = true) public void publishEvent(final String topicName, final String eventKey, final T event) { final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(topicName, eventKey, event); diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy new file mode 100644 index 0000000000..3db8520c29 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy @@ -0,0 +1,61 @@ +package org.onap.cps.ncmp.api.impl.async + +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.kafka.CloudEventSerializer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.onap.cps.ncmp.api.impl.events.EventsPublisher +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.config.KafkaListenerEndpointRegistry +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers +import java.util.concurrent.TimeUnit + +@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy]) +@DirtiesContext +@Testcontainers +@EnableAutoConfiguration +class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec { + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry + + @SpringBean + EventsPublisher mockEventsPublisher = Mock() + + def activateListeners() { + kafkaListenerEndpointRegistry.getListenerContainers().forEach( + messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) } + ) + } + + def 'Filtering Events.'() { + given: 'a cloud event of type: #eventType' + def cloudEvent = CloudEventBuilder.v1().withId("some-uuid") + .withType(eventType) + .withSource(URI.create("sample-test-source")) + .build(); + and: 'activate message listener container' + activateListeners() + when: 'send the cloud event' + ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent) + KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer)) + producer.send(record); + and: 'wait a little for async processing of message' + TimeUnit.MILLISECONDS.sleep(100) + then: 'the event has only been forwarded for the correct type' + expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _) + where: + eventType || expectedNUmberOfCallsToPublishForwardedEvent + 'DataOperationEvent' || 1 + 'other type' || 0 + 'any type contain the word "DataOperationEvent"' || 1 + } +} + diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy index d9b9ce6db0..7f8469aafc 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -21,11 +21,16 @@ package org.onap.cps.ncmp.api.impl.async import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.commons.lang3.SerializationUtils +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer +import io.cloudevents.kafka.impl.KafkaHeaders +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.header.internals.RecordHeaders import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent @@ -45,41 +50,56 @@ import java.time.Duration class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) + NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) @Autowired JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy + RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer)) def static clientTopic = 'client-topic' def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' - legacyEventKafkaConsumer.subscribe([clientTopic]) + cloudEventKafkaConsumer.subscribe([clientTopic]) and: 'consumer record for data operation event' def consumerRecordIn = createConsumerRecord(dataOperationType) when: 'the data operation event is consumed and published to client specified topic' - asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) + objectUnderTest.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' - def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] - then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.data.responses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId - assert operationIdIn == operationIdOut + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + then: 'verify cloud compliant headers' + def consumerRecordOutHeaders = consumerRecordOut.headers() + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType + and: 'verify that extension is included into header' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic + and: 'map consumer record to expected event type' + def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(), + PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue() + and: 'verify published response data properties' + def response = dataOperationResponseEvent.data.responses[0] + response.operationId == 'some-operation-id' + response.statusCode == 'any-success-status-code' + response.statusMessage == 'Successfully applied changes' + response.responseContent as String == '[some-key:some-value]' } def 'Filter an event with type #eventType'() { given: 'consumer record for event with type #eventType' def consumerRecord = createConsumerRecord(eventType) when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' - def result = recordFilterStrategy.filter(consumerRecord) + def result = dataOperationRecordFilterStrategy.filter(consumerRecord) then: 'the event is #description' assert result == expectedResult where: 'filter the event based on the eventType #eventType' @@ -90,14 +110,27 @@ class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { def createConsumerRecord(eventTypeAsString) { def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) - def eventTarget = SerializationUtils.serialize(clientTopic) - def eventType = SerializationUtils.serialize(eventTypeAsString) - def eventId = SerializationUtils.serialize('12345') - def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', eventId)) - consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) - consumerRecord.headers().add(new RecordHeader('eventType', eventType)) + def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)) + + CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes) + + def headers = new RecordHeaders() + def cloudEventSerializer = new CloudEventSerializer() + cloudEventSerializer.serialize(clientTopic, headers, cloudEvent) + + def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent) + headers.forEach(header -> consumerRecord.headers().add(header)) return consumerRecord } + + def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) { + return CloudEventBuilder.v1() + .withId("some-uuid") + .withType(eventTypeAsString) + .withSource(URI.create("sample-test-source")) + .withData(testEventSentAsBytes) + .withExtension("correlationid", "request-id") + .withExtension("destination", clientTopic) + .build(); + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 4a9e3ee811..5cc70e2809 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper]) @@ -85,8 +84,8 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def cloudevent = record.value() as CloudEvent - def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() + def cloudEvent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy index 3ae6348e96..7bdf335e3d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy @@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.inventory import com.fasterxml.jackson.databind.ObjectMapper import spock.lang.Specification - import java.time.OffsetDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml index 197bfda19c..df34f844d2 100644 --- a/cps-ncmp-service/src/test/resources/application.yml +++ b/cps-ncmp-service/src/test/resources/application.yml @@ -26,6 +26,8 @@ spring: app: ncmp: + async-m2m: + topic: ncmp-async-m2m avc: subscription-topic: cm-avc-subscription cm-events-topic: cm-events diff --git a/cps-ncmp-service/src/test/resources/dataOperationEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json index 42268c0ef3..0a32f38c0a 100644 --- a/cps-ncmp-service/src/test/resources/dataOperationEvent.json +++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json @@ -1,30 +1,15 @@ { - "data":{ - "responses":[ + "data": { + "responses": [ { - "operationId":"1", - "ids":[ - "123", - "124" + "operationId": "some-operation-id", + "ids": [ + "cm-handle-id" ], - "statusCode":1, - "statusMessage":"Batch operation success on the above cmhandle ids ", - "responseContent":{ - "ietf-netconf-monitoring:netconf-state":{ - "schemas":{ - "schema":[ - { - "identifier":"ietf-tls-server", - "version":"2016-11-02", - "format":"ietf-netconf-monitoring:yang", - "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server", - "location":[ - "NETCONF" - ] - } - ] - } - } + "statusCode": "any-success-status-code", + "statusMessage": "Successfully applied changes", + "responseContent": { + "some-key": "some-value" } } ] |