From f4c3f0fcebec726ea74b44f9bca3b68e66176671 Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Thu, 8 Jun 2023 15:25:33 +0100 Subject: Patch # 1: Data operation response event (NCMP → Client App) to comply with CloudEvents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removed header definitions (since we now use CloudEvents) - Used 'dataOperation' instead of batch where appropriate. - Modified test json Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh Change-Id: Ic0f65297b944adf9cf5f3c2cbec679a031a675ec Signed-off-by: sourabh_sourabh --- .../async/NcmpAsyncBatchEventConsumerSpec.groovy | 105 --------------------- .../NcmpAsyncDataOperationEventConsumerSpec.groovy | 103 ++++++++++++++++++++ 2 files changed, 103 insertions(+), 105 deletions(-) delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy (limited to 'cps-ncmp-service/src/test/groovy/org/onap') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy deleted file mode 100644 index 02071cd8c..000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.async - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.commons.lang3.SerializationUtils -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.ncmp.api.impl.events.EventsPublisher -import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 -import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.utils.JsonObjectMapper -import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.kafka.listener.adapter.RecordFilterStrategy -import org.springframework.test.annotation.DirtiesContext -import org.testcontainers.spock.Testcontainers - -import java.time.Duration - -@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper, - ObjectMapper]) -@Testcontainers -@DirtiesContext -class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { - - @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) - - @Autowired - JsonObjectMapper jsonObjectMapper - - @Autowired - RecordFilterStrategy recordFilterStrategy - - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) - def static clientTopic = 'client-topic' - def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' - - def 'Consume and publish event to client specified topic'() { - given: 'consumer subscribing to client topic' - legacyEventKafkaConsumer.subscribe([clientTopic]) - and: 'consumer record for batch event' - def consumerRecordIn = createConsumerRecord(batchEventType) - when: 'the batch event is consumed and published to client specified topic' - asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) - and: 'the client specified topic is polled' - def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] - then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId - assert operationIdIn == operationIdOut - } - - def 'Filter an event with type #eventType'() { - given: 'consumer record for event with type #eventType' - def consumerRecord = createConsumerRecord(eventType) - when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' - def result = recordFilterStrategy.filter(consumerRecord) - then: 'the event is #description' - assert result == expectedResult - where: 'filter the event based on the eventType #eventType' - description | eventType || expectedResult - 'not filtered(the consumer will see the event)' | batchEventType || false - 'filtered(the consumer will not see the event)' | 'wrongType' || true - } - - def createConsumerRecord(eventTypeAsString) { - def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class) - def eventTarget = SerializationUtils.serialize(clientTopic) - def eventType = SerializationUtils.serialize(eventTypeAsString) - def eventId = SerializationUtils.serialize('12345') - def consumerRecord = new ConsumerRecord(clientTopic, 1, 1L, '123', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', eventId)) - consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) - consumerRecord.headers().add(new RecordHeader('eventType', eventType)) - return consumerRecord - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy new file mode 100644 index 000000000..d9b9ce6db --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.commons.lang3.SerializationUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.cps.ncmp.api.impl.events.EventsPublisher +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.listener.adapter.RecordFilterStrategy +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers +import java.time.Duration + +@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper]) +@Testcontainers +@DirtiesContext +class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { + + @SpringBean + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + + @SpringBean + NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) + + @Autowired + JsonObjectMapper jsonObjectMapper + + @Autowired + RecordFilterStrategy recordFilterStrategy + + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) + def static clientTopic = 'client-topic' + def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' + + def 'Consume and publish event to client specified topic'() { + given: 'consumer subscribing to client topic' + legacyEventKafkaConsumer.subscribe([clientTopic]) + and: 'consumer record for data operation event' + def consumerRecordIn = createConsumerRecord(dataOperationType) + when: 'the data operation event is consumed and published to client specified topic' + asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) + and: 'the client specified topic is polled' + def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + then: 'verifying consumed event operationID is same as published event operationID' + def operationIdIn = consumerRecordIn.value.data.responses[0].operationId + def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId + assert operationIdIn == operationIdOut + } + + def 'Filter an event with type #eventType'() { + given: 'consumer record for event with type #eventType' + def consumerRecord = createConsumerRecord(eventType) + when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' + def result = recordFilterStrategy.filter(consumerRecord) + then: 'the event is #description' + assert result == expectedResult + where: 'filter the event based on the eventType #eventType' + description | eventType || expectedResult + 'not filtered(the consumer will see the event)' | dataOperationType || false + 'filtered(the consumer will not see the event)' | 'wrongType' || true + } + + def createConsumerRecord(eventTypeAsString) { + def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) + def eventTarget = SerializationUtils.serialize(clientTopic) + def eventType = SerializationUtils.serialize(eventTypeAsString) + def eventId = SerializationUtils.serialize('12345') + def consumerRecord = new ConsumerRecord(clientTopic, 1, 1L, '123', testEventSent) + consumerRecord.headers().add(new RecordHeader('eventId', eventId)) + consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) + consumerRecord.headers().add(new RecordHeader('eventType', eventType)) + return consumerRecord + } +} -- cgit 1.2.3-korg