aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json55
-rw-r--r--cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json (renamed from cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json)29
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java)10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java)31
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy)38
-rw-r--r--cps-ncmp-service/src/test/resources/dataOperationEvent.json (renamed from cps-ncmp-service/src/test/resources/batchDataEvent.json)24
6 files changed, 58 insertions, 129 deletions
diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json
deleted file mode 100644
index bbcadcd0f4..0000000000
--- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json
+++ /dev/null
@@ -1,55 +0,0 @@
-{
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-headers:1.0.0",
- "$ref": "#/definitions/BatchEventHeaders",
- "definitions": {
- "BatchEventHeaders": {
- "description": "The header information of the Batch event.",
- "type": "object",
- "javaType" : "org.onap.cps.ncmp.events.async.BatchEventHeadersV1",
- "properties": {
- "eventId": {
- "description": "The unique id for identifying the event.",
- "type": "string"
- },
- "eventCorrelationId": {
- "description": "The request id received by NCMP as an acknowledgement.",
- "type": "string"
- },
- "eventTime": {
- "description": "The time of the event. It should be in RFC format ('yyyy-MM-dd'T'HH:mm:ss.SSSZ').",
- "type": "string"
- },
- "eventTarget": {
- "description": "The destination topic to forward the consumed event.",
- "type": "string"
- },
- "eventSource": {
- "description": "The source of the event.",
- "type": "string"
- },
- "eventType": {
- "description": "The type of the Batch event.",
- "type": "string"
- },
- "eventSchema": {
- "description": "The schema of the Batch event payload.",
- "type": "string"
- },
- "eventSchemaVersion": {
- "description": "The schema version of the Batch event payload.",
- "type": "string"
- }
- },
- "required": [
- "eventId",
- "eventCorrelationId",
- "eventTarget",
- "eventType",
- "eventSchema",
- "eventSchemaVersion"
- ],
- "additionalProperties": false
- }
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json
index da836ff167..308e3068d6 100644
--- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json
+++ b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json
@@ -1,19 +1,18 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-schema:1.0.0",
- "$ref": "#/definitions/BatchDataResponseEvent",
+ "$id": "urn:cps:org.onap.cps.ncmp.events.async:data-operation-event-schema:1.0.0",
+ "$ref": "#/definitions/DataOperationEvent",
"definitions": {
- "BatchDataResponseEvent": {
- "description": "The payload of batch event.",
+ "DataOperationEvent": {
+ "description": "The payload of data operation event.",
"type": "object",
- "javaType" : "org.onap.cps.ncmp.events.async.BatchDataResponseEventV1",
+ "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent",
"properties": {
- "event": {
+ "data": {
"description": "The payload content of the requested data.",
"type": "object",
- "javaType" : "org.onap.cps.ncmp.events.async.BatchDataEvent",
"properties": {
- "batch-responses": {
+ "responses": {
"description": "An array of batch responses which contains both success and failure",
"type": "array",
"items": {
@@ -27,15 +26,15 @@
"description": "Id's of the cmhandles",
"type": "array"
},
- "status-code": {
+ "statusCode": {
"description": "which says success or failure (0-99) are for success and (100-199) are for failure",
"type": "string"
},
- "status-message": {
+ "statusMessage": {
"description": "Human readable message, Which says what the response has",
"type": "string"
},
- "data": {
+ "responseContent": {
"description": "Contains the requested data response.",
"type": "object",
"existingJavaType": "java.lang.Object",
@@ -45,21 +44,21 @@
"required": [
"operationId",
"ids",
- "status-code",
- "status-message"
+ "statusCode",
+ "statusMessage"
],
"additionalProperties": false
}
}
},
"required": [
- "batch-responses"
+ "responses"
],
"additionalProperties": false
}
},
"required": [
- "event"
+ "data"
],
"additionalProperties": false
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index b343d70a7a..9e2b66a2c1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
/**
- * Batch Record filter strategy, which helps to filter the consumer records.
+ * Data operation record filter strategy, which helps to filter the consumer records.
*
*/
@Configuration
-public class BatchRecordFilterStrategy {
+public class DataOperationRecordFilterStrategy {
/**
* Filtering the consumer records based on the eventType header, It
@@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy {
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
+ public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
if (eventTypeHeader == null) {
@@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy {
}
final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ && eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
index 2a332d0037..995a4d5a67 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
@@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
- * Listener for cps-ncmp async batch events.
+ * Listener for cps-ncmp async data operation events.
*/
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpAsyncBatchEventConsumer {
+public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher;
+ private final EventsPublisher<DataOperationEvent> eventsPublisher;
/**
- * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic'
+ * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
* and publish the same to the client specified topic.
*
- * @param batchEventConsumerRecord consuming event as a ConsumerRecord.
+ * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
*/
@KafkaListener(
topics = "${app.ncmp.async-m2m.topic}",
- filter = "filterBatchDataResponseEvent",
- groupId = "ncmp-batch-event-group",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"})
- public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) {
- log.info("Consuming event payload {} ...", batchEventConsumerRecord.value());
+ filter = "includeDataOperationEventsOnly",
+ groupId = "ncmp-data-operation-event-group",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
+ public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
+ dataOperationEventConsumerRecord) {
+ log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
final String eventTarget = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
final String eventId = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(),
- batchEventConsumerRecord.value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
+ eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
+ dataOperationEventConsumerRecord.value());
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
index 02071cd8cf..d9b9ce6db0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
@@ -28,7 +28,7 @@ import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
@@ -37,43 +37,41 @@ import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.listener.adapter.RecordFilterStrategy
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
-
import java.time.Duration
-@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper,
- ObjectMapper])
+@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper])
@Testcontainers
@DirtiesContext
-class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
+class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
+ NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
+ RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
def static clientTopic = 'client-topic'
- def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
+ def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
legacyEventKafkaConsumer.subscribe([clientTopic])
- and: 'consumer record for batch event'
- def consumerRecordIn = createConsumerRecord(batchEventType)
- when: 'the batch event is consumed and published to client specified topic'
- asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
+ and: 'consumer record for data operation event'
+ def consumerRecordIn = createConsumerRecord(dataOperationType)
+ when: 'the data operation event is consumed and published to client specified topic'
+ asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
then: 'verifying consumed event operationID is same as published event operationID'
- def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
- def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId
+ def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
+ def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
assert operationIdIn == operationIdOut
}
@@ -85,14 +83,14 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
then: 'the event is #description'
assert result == expectedResult
where: 'filter the event based on the eventType #eventType'
- description | eventType || expectedResult
- 'not filtered(the consumer will see the event)' | batchEventType || false
- 'filtered(the consumer will not see the event)' | 'wrongType' || true
+ description | eventType || expectedResult
+ 'not filtered(the consumer will see the event)' | dataOperationType || false
+ 'filtered(the consumer will not see the event)' | 'wrongType' || true
}
def createConsumerRecord(eventTypeAsString) {
- def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class)
+ def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
def eventTarget = SerializationUtils.serialize(clientTopic)
def eventType = SerializationUtils.serialize(eventTypeAsString)
def eventId = SerializationUtils.serialize('12345')
diff --git a/cps-ncmp-service/src/test/resources/batchDataEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
index 49eb273f58..42268c0ef3 100644
--- a/cps-ncmp-service/src/test/resources/batchDataEvent.json
+++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
@@ -1,15 +1,15 @@
{
- "event":{
- "batch-responses":[
+ "data":{
+ "responses":[
{
"operationId":"1",
"ids":[
"123",
"124"
],
- "status-code":1,
- "status-message":"Batch operation success on the above cmhandle ids ",
- "data":{
+ "statusCode":1,
+ "statusMessage":"Batch operation success on the above cmhandle ids ",
+ "responseContent":{
"ietf-netconf-monitoring:netconf-state":{
"schemas":{
"schema":[
@@ -26,20 +26,6 @@
}
}
}
- },
- {
- "operationId":"101",
- "ids":[
- "456",
- "457"
- ],
- "status-code":101,
- "status-message":"cmHandle(s) do not exist",
- "data":{
- "error":{
- "message":"cmHandle(s) do not exist"
- }
- }
}
]
}