diff options
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java) | 10 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java) | 31 | ||||
-rw-r--r-- | cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy) | 38 | ||||
-rw-r--r-- | cps-ncmp-service/src/test/resources/dataOperationEvent.json (renamed from cps-ncmp-service/src/test/resources/batchDataEvent.json) | 24 |
4 files changed, 44 insertions, 59 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java index b343d70a7a..9e2b66a2c1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; /** - * Batch Record filter strategy, which helps to filter the consumer records. + * Data operation record filter strategy, which helps to filter the consumer records. * */ @Configuration -public class BatchRecordFilterStrategy { +public class DataOperationRecordFilterStrategy { /** * Filtering the consumer records based on the eventType header, It @@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() { + public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() { return consumedRecord -> { final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); if (eventTypeHeader == null) { @@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy { } final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + && eventTypeHeaderValue.contains("DataOperationEvent")); }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 2a332d0037..995a4d5a67 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for cps-ncmp async batch events. + * Listener for cps-ncmp async data operation events. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { +public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher; + private final EventsPublisher<DataOperationEvent> eventsPublisher; /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' * and publish the same to the client specified topic. * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @KafkaListener( topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent> + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy index 02071cd8cf..d9b9ce6db0 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -28,7 +28,7 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean @@ -37,43 +37,41 @@ import org.springframework.boot.test.context.SpringBootTest import org.springframework.kafka.listener.adapter.RecordFilterStrategy import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration -@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper, - ObjectMapper]) +@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper]) @Testcontainers @DirtiesContext -class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { +class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) + NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) @Autowired JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy + RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) def static clientTopic = 'client-topic' - def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' + def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' legacyEventKafkaConsumer.subscribe([clientTopic]) - and: 'consumer record for batch event' - def consumerRecordIn = createConsumerRecord(batchEventType) - when: 'the batch event is consumed and published to client specified topic' - asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) + and: 'consumer record for data operation event' + def consumerRecordIn = createConsumerRecord(dataOperationType) + when: 'the data operation event is consumed and published to client specified topic' + asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId + def operationIdIn = consumerRecordIn.value.data.responses[0].operationId + def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId assert operationIdIn == operationIdOut } @@ -85,14 +83,14 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { then: 'the event is #description' assert result == expectedResult where: 'filter the event based on the eventType #eventType' - description | eventType || expectedResult - 'not filtered(the consumer will see the event)' | batchEventType || false - 'filtered(the consumer will not see the event)' | 'wrongType' || true + description | eventType || expectedResult + 'not filtered(the consumer will see the event)' | dataOperationType || false + 'filtered(the consumer will not see the event)' | 'wrongType' || true } def createConsumerRecord(eventTypeAsString) { - def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class) + def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) def eventTarget = SerializationUtils.serialize(clientTopic) def eventType = SerializationUtils.serialize(eventTypeAsString) def eventId = SerializationUtils.serialize('12345') diff --git a/cps-ncmp-service/src/test/resources/batchDataEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json index 49eb273f58..42268c0ef3 100644 --- a/cps-ncmp-service/src/test/resources/batchDataEvent.json +++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json @@ -1,15 +1,15 @@ { - "event":{ - "batch-responses":[ + "data":{ + "responses":[ { "operationId":"1", "ids":[ "123", "124" ], - "status-code":1, - "status-message":"Batch operation success on the above cmhandle ids ", - "data":{ + "statusCode":1, + "statusMessage":"Batch operation success on the above cmhandle ids ", + "responseContent":{ "ietf-netconf-monitoring:netconf-state":{ "schemas":{ "schema":[ @@ -26,20 +26,6 @@ } } } - }, - { - "operationId":"101", - "ids":[ - "456", - "457" - ], - "status-code":101, - "status-message":"cmHandle(s) do not exist", - "data":{ - "error":{ - "message":"cmHandle(s) do not exist" - } - } } ] } |