From f4c3f0fcebec726ea74b44f9bca3b68e66176671 Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Thu, 8 Jun 2023 15:25:33 +0100 Subject: Patch # 1: Data operation response event (NCMP → Client App) to comply with CloudEvents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removed header definitions (since we now use CloudEvents) - Used 'dataOperation' instead of batch where appropriate. - Modified test json Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh Change-Id: Ic0f65297b944adf9cf5f3c2cbec679a031a675ec Signed-off-by: sourabh_sourabh --- .../schemas/async/batch-event-headers-1.0.0.json | 55 ----------- .../schemas/async/batch-event-schema-1.0.0.json | 67 ------------- .../async/data-operation-event-schema-1.0.0.json | 66 +++++++++++++ .../api/impl/async/BatchRecordFilterStrategy.java | 55 ----------- .../async/DataOperationRecordFilterStrategy.java | 55 +++++++++++ .../impl/async/NcmpAsyncBatchEventConsumer.java | 64 ------------- .../async/NcmpAsyncDataOperationEventConsumer.java | 65 +++++++++++++ .../async/NcmpAsyncBatchEventConsumerSpec.groovy | 105 --------------------- .../NcmpAsyncDataOperationEventConsumerSpec.groovy | 103 ++++++++++++++++++++ .../src/test/resources/batchDataEvent.json | 46 --------- .../src/test/resources/dataOperationEvent.json | 32 +++++++ 11 files changed, 321 insertions(+), 392 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json delete mode 100644 cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json create mode 100644 cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy delete mode 100644 cps-ncmp-service/src/test/resources/batchDataEvent.json create mode 100644 cps-ncmp-service/src/test/resources/dataOperationEvent.json diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json deleted file mode 100644 index bbcadcd0f4..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-headers:1.0.0", - "$ref": "#/definitions/BatchEventHeaders", - "definitions": { - "BatchEventHeaders": { - "description": "The header information of the Batch event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchEventHeadersV1", - "properties": { - "eventId": { - "description": "The unique id for identifying the event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id received by NCMP as an acknowledgement.", - "type": "string" - }, - "eventTime": { - "description": "The time of the event. It should be in RFC format ('yyyy-MM-dd'T'HH:mm:ss.SSSZ').", - "type": "string" - }, - "eventTarget": { - "description": "The destination topic to forward the consumed event.", - "type": "string" - }, - "eventSource": { - "description": "The source of the event.", - "type": "string" - }, - "eventType": { - "description": "The type of the Batch event.", - "type": "string" - }, - "eventSchema": { - "description": "The schema of the Batch event payload.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The schema version of the Batch event payload.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventTarget", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json deleted file mode 100644 index da836ff167..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-schema:1.0.0", - "$ref": "#/definitions/BatchDataResponseEvent", - "definitions": { - "BatchDataResponseEvent": { - "description": "The payload of batch event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataResponseEventV1", - "properties": { - "event": { - "description": "The payload content of the requested data.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataEvent", - "properties": { - "batch-responses": { - "description": "An array of batch responses which contains both success and failure", - "type": "array", - "items": { - "type": "object", - "properties": { - "operationId": { - "description": "Used to distinguish multiple operations using same cmhandleId", - "type": "string" - }, - "ids": { - "description": "Id's of the cmhandles", - "type": "array" - }, - "status-code": { - "description": "which says success or failure (0-99) are for success and (100-199) are for failure", - "type": "string" - }, - "status-message": { - "description": "Human readable message, Which says what the response has", - "type": "string" - }, - "data": { - "description": "Contains the requested data response.", - "type": "object", - "existingJavaType": "java.lang.Object", - "additionalProperties": false - } - }, - "required": [ - "operationId", - "ids", - "status-code", - "status-message" - ], - "additionalProperties": false - } - } - }, - "required": [ - "batch-responses" - ], - "additionalProperties": false - } - }, - "required": [ - "event" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json new file mode 100644 index 0000000000..308e3068d6 --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json @@ -0,0 +1,66 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events.async:data-operation-event-schema:1.0.0", + "$ref": "#/definitions/DataOperationEvent", + "definitions": { + "DataOperationEvent": { + "description": "The payload of data operation event.", + "type": "object", + "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent", + "properties": { + "data": { + "description": "The payload content of the requested data.", + "type": "object", + "properties": { + "responses": { + "description": "An array of batch responses which contains both success and failure", + "type": "array", + "items": { + "type": "object", + "properties": { + "operationId": { + "description": "Used to distinguish multiple operations using same cmhandleId", + "type": "string" + }, + "ids": { + "description": "Id's of the cmhandles", + "type": "array" + }, + "statusCode": { + "description": "which says success or failure (0-99) are for success and (100-199) are for failure", + "type": "string" + }, + "statusMessage": { + "description": "Human readable message, Which says what the response has", + "type": "string" + }, + "responseContent": { + "description": "Contains the requested data response.", + "type": "object", + "existingJavaType": "java.lang.Object", + "additionalProperties": false + } + }, + "required": [ + "operationId", + "ids", + "statusCode", + "statusMessage" + ], + "additionalProperties": false + } + } + }, + "required": [ + "responses" + ], + "additionalProperties": false + } + }, + "required": [ + "data" + ], + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java deleted file mode 100644 index b343d70a7a..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.async; - -import org.apache.commons.lang3.SerializationUtils; -import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.listener.adapter.RecordFilterStrategy; - -/** - * Batch Record filter strategy, which helps to filter the consumer records. - * - */ -@Configuration -public class BatchRecordFilterStrategy { - - /** - * Filtering the consumer records based on the eventType header, It - * returns boolean, true means filter the consumer record and false - * means not filter the consumer record. - * @return boolean value. - */ - @Bean - public RecordFilterStrategy filterBatchDataResponseEvent() { - return consumedRecord -> { - final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); - if (eventTypeHeader == null) { - return false; - } - final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); - return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); - }; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java new file mode 100644 index 0000000000..9e2b66a2c1 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kafka.common.header.Header; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; + +/** + * Data operation record filter strategy, which helps to filter the consumer records. + * + */ +@Configuration +public class DataOperationRecordFilterStrategy { + + /** + * Filtering the consumer records based on the eventType header, It + * returns boolean, true means filter the consumer record and false + * means not filter the consumer record. + * @return boolean value. + */ + @Bean + public RecordFilterStrategy includeDataOperationEventsOnly() { + return consumedRecord -> { + final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); + if (eventTypeHeader == null) { + return false; + } + final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); + return !(eventTypeHeaderValue != null + && eventTypeHeaderValue.contains("DataOperationEvent")); + }; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java deleted file mode 100644 index 2a332d0037..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.async; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -/** - * Listener for cps-ncmp async batch events. - */ -@Component -@Slf4j -@RequiredArgsConstructor -@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { - - private final EventsPublisher eventsPublisher; - - /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' - * and publish the same to the client specified topic. - * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. - */ - @KafkaListener( - topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); - final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); - final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java new file mode 100644 index 0000000000..995a4d5a67 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * Listener for cps-ncmp async data operation events. + */ +@Component +@Slf4j +@RequiredArgsConstructor +@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) +public class NcmpAsyncDataOperationEventConsumer { + + private final EventsPublisher eventsPublisher; + + /** + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' + * and publish the same to the client specified topic. + * + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. + */ + @KafkaListener( + topics = "${app.ncmp.async-m2m.topic}", + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); + final String eventTarget = SerializationUtils + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); + final String eventId = SerializationUtils + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy deleted file mode 100644 index 02071cd8cf..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.async - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.commons.lang3.SerializationUtils -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.ncmp.api.impl.events.EventsPublisher -import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 -import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.utils.JsonObjectMapper -import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.kafka.listener.adapter.RecordFilterStrategy -import org.springframework.test.annotation.DirtiesContext -import org.testcontainers.spock.Testcontainers - -import java.time.Duration - -@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper, - ObjectMapper]) -@Testcontainers -@DirtiesContext -class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { - - @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) - - @Autowired - JsonObjectMapper jsonObjectMapper - - @Autowired - RecordFilterStrategy recordFilterStrategy - - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) - def static clientTopic = 'client-topic' - def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' - - def 'Consume and publish event to client specified topic'() { - given: 'consumer subscribing to client topic' - legacyEventKafkaConsumer.subscribe([clientTopic]) - and: 'consumer record for batch event' - def consumerRecordIn = createConsumerRecord(batchEventType) - when: 'the batch event is consumed and published to client specified topic' - asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) - and: 'the client specified topic is polled' - def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] - then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId - assert operationIdIn == operationIdOut - } - - def 'Filter an event with type #eventType'() { - given: 'consumer record for event with type #eventType' - def consumerRecord = createConsumerRecord(eventType) - when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' - def result = recordFilterStrategy.filter(consumerRecord) - then: 'the event is #description' - assert result == expectedResult - where: 'filter the event based on the eventType #eventType' - description | eventType || expectedResult - 'not filtered(the consumer will see the event)' | batchEventType || false - 'filtered(the consumer will not see the event)' | 'wrongType' || true - } - - def createConsumerRecord(eventTypeAsString) { - def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class) - def eventTarget = SerializationUtils.serialize(clientTopic) - def eventType = SerializationUtils.serialize(eventTypeAsString) - def eventId = SerializationUtils.serialize('12345') - def consumerRecord = new ConsumerRecord(clientTopic, 1, 1L, '123', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', eventId)) - consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) - consumerRecord.headers().add(new RecordHeader('eventType', eventType)) - return consumerRecord - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy new file mode 100644 index 0000000000..d9b9ce6db0 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.commons.lang3.SerializationUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.cps.ncmp.api.impl.events.EventsPublisher +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.listener.adapter.RecordFilterStrategy +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers +import java.time.Duration + +@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper]) +@Testcontainers +@DirtiesContext +class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { + + @SpringBean + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + + @SpringBean + NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) + + @Autowired + JsonObjectMapper jsonObjectMapper + + @Autowired + RecordFilterStrategy recordFilterStrategy + + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) + def static clientTopic = 'client-topic' + def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' + + def 'Consume and publish event to client specified topic'() { + given: 'consumer subscribing to client topic' + legacyEventKafkaConsumer.subscribe([clientTopic]) + and: 'consumer record for data operation event' + def consumerRecordIn = createConsumerRecord(dataOperationType) + when: 'the data operation event is consumed and published to client specified topic' + asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) + and: 'the client specified topic is polled' + def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + then: 'verifying consumed event operationID is same as published event operationID' + def operationIdIn = consumerRecordIn.value.data.responses[0].operationId + def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId + assert operationIdIn == operationIdOut + } + + def 'Filter an event with type #eventType'() { + given: 'consumer record for event with type #eventType' + def consumerRecord = createConsumerRecord(eventType) + when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy' + def result = recordFilterStrategy.filter(consumerRecord) + then: 'the event is #description' + assert result == expectedResult + where: 'filter the event based on the eventType #eventType' + description | eventType || expectedResult + 'not filtered(the consumer will see the event)' | dataOperationType || false + 'filtered(the consumer will not see the event)' | 'wrongType' || true + } + + def createConsumerRecord(eventTypeAsString) { + def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) + def eventTarget = SerializationUtils.serialize(clientTopic) + def eventType = SerializationUtils.serialize(eventTypeAsString) + def eventId = SerializationUtils.serialize('12345') + def consumerRecord = new ConsumerRecord(clientTopic, 1, 1L, '123', testEventSent) + consumerRecord.headers().add(new RecordHeader('eventId', eventId)) + consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget)) + consumerRecord.headers().add(new RecordHeader('eventType', eventType)) + return consumerRecord + } +} diff --git a/cps-ncmp-service/src/test/resources/batchDataEvent.json b/cps-ncmp-service/src/test/resources/batchDataEvent.json deleted file mode 100644 index 49eb273f58..0000000000 --- a/cps-ncmp-service/src/test/resources/batchDataEvent.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "event":{ - "batch-responses":[ - { - "operationId":"1", - "ids":[ - "123", - "124" - ], - "status-code":1, - "status-message":"Batch operation success on the above cmhandle ids ", - "data":{ - "ietf-netconf-monitoring:netconf-state":{ - "schemas":{ - "schema":[ - { - "identifier":"ietf-tls-server", - "version":"2016-11-02", - "format":"ietf-netconf-monitoring:yang", - "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server", - "location":[ - "NETCONF" - ] - } - ] - } - } - } - }, - { - "operationId":"101", - "ids":[ - "456", - "457" - ], - "status-code":101, - "status-message":"cmHandle(s) do not exist", - "data":{ - "error":{ - "message":"cmHandle(s) do not exist" - } - } - } - ] - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/dataOperationEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json new file mode 100644 index 0000000000..42268c0ef3 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json @@ -0,0 +1,32 @@ +{ + "data":{ + "responses":[ + { + "operationId":"1", + "ids":[ + "123", + "124" + ], + "statusCode":1, + "statusMessage":"Batch operation success on the above cmhandle ids ", + "responseContent":{ + "ietf-netconf-monitoring:netconf-state":{ + "schemas":{ + "schema":[ + { + "identifier":"ietf-tls-server", + "version":"2016-11-02", + "format":"ietf-netconf-monitoring:yang", + "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server", + "location":[ + "NETCONF" + ] + } + ] + } + } + } + } + ] + } +} \ No newline at end of file -- cgit 1.2.3-korg