diff options
author | Bruno Sakoto <bruno.sakoto@bell.ca> | 2021-08-20 18:59:25 -0400 |
---|---|---|
committer | Bruno Sakoto <bruno.sakoto@bell.ca> | 2021-08-25 06:37:49 -0400 |
commit | e36f8e376dbdb8cf10d40e67a89a712bca688601 (patch) | |
tree | 95f7a5a01e3f9f7f6ac4bf98e7bd52a637e128a7 | |
parent | 976fe54b0782023ab496904481dc0cc9f4eaf4d4 (diff) |
Prepare for next event schema version
Issue-ID: CPS-584, CPS-459
Signed-off-by: Bruno Sakoto <bruno.sakoto@bell.ca>
Change-Id: I02ce027a4222fe3178d854628a52626c8184d987
7 files changed, 28 insertions, 14 deletions
@@ -55,9 +55,9 @@ docker-compose up Then, use `kafkacat` tool to produce a data updated event into the Kafka topic: ```bash -docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.cfg-state-events -D/ -P <<EOF +docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.data-updated-events -D/ -P <<EOF { - "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT", + "schema": "urn:cps:org.onap.cps:data-updated-event-schema:v1", "id": "38aa6cc6-264d-4ede-b534-18f5c1f403ea", "source": "urn:cps:org.onap.cps", "type": "org.onap.cps.data-updated-event", @@ -136,7 +136,7 @@ <dependency> <groupId>org.onap.cps</groupId> <artifactId>cps-events</artifactId> - <version>1.1.0</version> + <version>1.1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springdoc</groupId> diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java index 2e4b88e..5fce94e 100644 --- a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java @@ -40,6 +40,7 @@ import org.springframework.util.StringUtils; @Slf4j public class DataUpdatedEventListener { + private static final String EVENT_SCHEMA_URN_PREFIX = "urn:cps:org.onap.cps:data-updated-event-schema:v"; private static final URI EVENT_SOURCE; static { @@ -93,12 +94,13 @@ public class DataUpdatedEventListener { new InvalidEventEnvelopException("Validation failure", cpsDataUpdatedEvent); // Validate schema - if (cpsDataUpdatedEvent.getSchema() == null) { + if (cpsDataUpdatedEvent.getSchema() == null + || !cpsDataUpdatedEvent.getSchema().toString().startsWith(EVENT_SCHEMA_URN_PREFIX)) { invalidEventEnvelopException.addInvalidField( new InvalidEventEnvelopException.InvalidField( - MISSING, "schema", null, - CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT - .value())); + UNEXPECTED, "schema", + cpsDataUpdatedEvent.getSchema() != null ? cpsDataUpdatedEvent.getSchema().toString() : null, + EVENT_SCHEMA_URN_PREFIX + "99")); } // Validate id if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a3b1cd8..7db9510 100755 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -52,11 +52,12 @@ spring: spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent + spring.json.use.type.headers: false app: listener: data-updated: - topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events} + topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events} query: response: max-page-size: 10000 diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy index 35ed977..055147f 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy @@ -33,8 +33,11 @@ import static org.onap.cps.temporal.controller.event.listener.exception.InvalidE */ class DataUpdatedEventListenerSpec extends Specification { + public static final String EXPECTED_SCHEMA_EXCEPTION_MESSAGE = 'urn:cps:org.onap.cps:data-updated-event-schema:v99' + // Define event data def anEventType = 'my-event-type' + def anEventSchema = new URI('my-event-schema') def anEventSource = new URI('my-event-source') def aTimestamp = EventFixtures.currentIsoTimestamp() def aDataspace = 'my-dataspace' @@ -82,9 +85,7 @@ class DataUpdatedEventListenerSpec extends Specification { e.getInvalidFields().size() == 4 e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - MISSING,"schema", null, - CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT - .value())) + UNEXPECTED,"schema", null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( MISSING, "id", null, null)) @@ -101,12 +102,19 @@ class DataUpdatedEventListenerSpec extends Specification { when: 'an event with an invalid envelop is received' def invalidEvent = new CpsDataUpdatedEvent() - .withId('my-id').withSource(anEventSource).withType(anEventType) + .withId('my-id') + .withSchema(anEventSchema) + .withSource(anEventSource) + .withType(anEventType) objectUnderTest.consume(invalidEvent) then: 'an exception is thrown with 2 invalid fields' def e = thrown(InvalidEventEnvelopException) e.getCpsDataUpdatedEvent() == invalidEvent - e.getInvalidFields().size() == 2 + e.getInvalidFields().size() == 3 + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "schema", anEventSchema.toString(), + EXPECTED_SCHEMA_EXCEPTION_MESSAGE)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType)) diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy index 44a28de..7c4dee6 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy @@ -32,11 +32,14 @@ class EventFixtures { static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") static String defaultEventType = 'org.onap.cps.data-updated-event' + static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') static URI defaultEventSource = new URI('urn:cps:org.onap.cps') static CpsDataUpdatedEvent buildEvent(final Map map) { CpsDataUpdatedEvent event = new CpsDataUpdatedEvent() + .withSchema( + map.eventSchema != null ? new URI(map.eventSchema.toString()) : defaultEventSchema) .withId( map.id != null ? map.id.toString() : UUID.randomUUID().toString()) .withType( diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 6765057..9bdacbe 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -60,7 +60,7 @@ spring: app: listener: data-updated: - topic: cps.cfg-state-events + topic: cps.data-updated-events query: response: max-page-size: 20 |