summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xREADME.md4
-rwxr-xr-xpom.xml2
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java10
-rwxr-xr-xsrc/main/resources/application.yml3
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy18
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy3
-rw-r--r--src/test/resources/application.yml2
7 files changed, 28 insertions, 14 deletions
diff --git a/README.md b/README.md
index 6e6ce22..bf2210f 100755
--- a/README.md
+++ b/README.md
@@ -55,9 +55,9 @@ docker-compose up
Then, use `kafkacat` tool to produce a data updated event into the Kafka topic:
```bash
-docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.cfg-state-events -D/ -P <<EOF
+docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.data-updated-events -D/ -P <<EOF
{
- "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
+ "schema": "urn:cps:org.onap.cps:data-updated-event-schema:v1",
"id": "38aa6cc6-264d-4ede-b534-18f5c1f403ea",
"source": "urn:cps:org.onap.cps",
"type": "org.onap.cps.data-updated-event",
diff --git a/pom.xml b/pom.xml
index 5bcb116..fa44f35 100755
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
<dependency>
<groupId>org.onap.cps</groupId>
<artifactId>cps-events</artifactId>
- <version>1.1.0</version>
+ <version>1.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
index 2e4b88e..5fce94e 100644
--- a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
@@ -40,6 +40,7 @@ import org.springframework.util.StringUtils;
@Slf4j
public class DataUpdatedEventListener {
+ private static final String EVENT_SCHEMA_URN_PREFIX = "urn:cps:org.onap.cps:data-updated-event-schema:v";
private static final URI EVENT_SOURCE;
static {
@@ -93,12 +94,13 @@ public class DataUpdatedEventListener {
new InvalidEventEnvelopException("Validation failure", cpsDataUpdatedEvent);
// Validate schema
- if (cpsDataUpdatedEvent.getSchema() == null) {
+ if (cpsDataUpdatedEvent.getSchema() == null
+ || !cpsDataUpdatedEvent.getSchema().toString().startsWith(EVENT_SCHEMA_URN_PREFIX)) {
invalidEventEnvelopException.addInvalidField(
new InvalidEventEnvelopException.InvalidField(
- MISSING, "schema", null,
- CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
- .value()));
+ UNEXPECTED, "schema",
+ cpsDataUpdatedEvent.getSchema() != null ? cpsDataUpdatedEvent.getSchema().toString() : null,
+ EVENT_SCHEMA_URN_PREFIX + "99"));
}
// Validate id
if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index a3b1cd8..7db9510 100755
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -52,11 +52,12 @@ spring:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
+ spring.json.use.type.headers: false
app:
listener:
data-updated:
- topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
+ topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events}
query:
response:
max-page-size: 10000
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
index 35ed977..055147f 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
@@ -33,8 +33,11 @@ import static org.onap.cps.temporal.controller.event.listener.exception.InvalidE
*/
class DataUpdatedEventListenerSpec extends Specification {
+ public static final String EXPECTED_SCHEMA_EXCEPTION_MESSAGE = 'urn:cps:org.onap.cps:data-updated-event-schema:v99'
+
// Define event data
def anEventType = 'my-event-type'
+ def anEventSchema = new URI('my-event-schema')
def anEventSource = new URI('my-event-source')
def aTimestamp = EventFixtures.currentIsoTimestamp()
def aDataspace = 'my-dataspace'
@@ -82,9 +85,7 @@ class DataUpdatedEventListenerSpec extends Specification {
e.getInvalidFields().size() == 4
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- MISSING,"schema", null,
- CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
- .value()))
+ UNEXPECTED,"schema", null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
MISSING, "id", null, null))
@@ -101,12 +102,19 @@ class DataUpdatedEventListenerSpec extends Specification {
when: 'an event with an invalid envelop is received'
def invalidEvent =
new CpsDataUpdatedEvent()
- .withId('my-id').withSource(anEventSource).withType(anEventType)
+ .withId('my-id')
+ .withSchema(anEventSchema)
+ .withSource(anEventSource)
+ .withType(anEventType)
objectUnderTest.consume(invalidEvent)
then: 'an exception is thrown with 2 invalid fields'
def e = thrown(InvalidEventEnvelopException)
e.getCpsDataUpdatedEvent() == invalidEvent
- e.getInvalidFields().size() == 2
+ e.getInvalidFields().size() == 3
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "schema", anEventSchema.toString(),
+ EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType))
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
index 44a28de..7c4dee6 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
@@ -32,11 +32,14 @@ class EventFixtures {
static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
static String defaultEventType = 'org.onap.cps.data-updated-event'
+ static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
static URI defaultEventSource = new URI('urn:cps:org.onap.cps')
static CpsDataUpdatedEvent buildEvent(final Map map) {
CpsDataUpdatedEvent event =
new CpsDataUpdatedEvent()
+ .withSchema(
+ map.eventSchema != null ? new URI(map.eventSchema.toString()) : defaultEventSchema)
.withId(
map.id != null ? map.id.toString() : UUID.randomUUID().toString())
.withType(
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index 6765057..9bdacbe 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -60,7 +60,7 @@ spring:
app:
listener:
data-updated:
- topic: cps.cfg-state-events
+ topic: cps.data-updated-events
query:
response:
max-page-size: 20