aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBruno Sakoto <bruno.sakoto@bell.ca>2021-06-04 07:49:14 -0400
committerBruno Sakoto <bruno.sakoto@bell.ca>2021-07-05 18:37:15 -0400
commitc9b99347c7c425fce7a1f5f3c7e2ac500f2f0c5c (patch)
tree199b49d2db987f2c630de124356a48b1bd97657a
parent8b2193b7ed06e2ee6a90f7986921e72ca70ad90f (diff)
Add kafka listener for data updated events
See "Running via Docker Compose" section from README.md file to have an example of event processing Issue-ID: CPS-371 Signed-off-by: Bruno Sakoto <bruno.sakoto@bell.ca> Change-Id: Id3abfa32fb04e07102a5f28e6e43a9b533391188
-rwxr-xr-xREADME.md42
-rwxr-xr-xdocker-compose.yml30
-rwxr-xr-xpom.xml63
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java34
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java73
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java129
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java44
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java56
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/web/QueryController.java (renamed from src/main/java/org/onap/cps/temporal/controller/QueryController.java)2
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/NetworkData.java9
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java6
-rw-r--r--src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java27
-rw-r--r--src/main/java/org/onap/cps/temporal/service/ServiceException.java34
-rw-r--r--src/main/resources/application-sasl-ssl-kafka.yml31
-rwxr-xr-xsrc/main/resources/application.yml28
-rw-r--r--src/main/resources/logback.xml43
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy124
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy117
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy72
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy131
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy (renamed from src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy)4
-rw-r--r--src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy3
-rw-r--r--src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy37
-rw-r--r--src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java2
-rw-r--r--src/test/resources/application.yml36
25 files changed, 1126 insertions, 51 deletions
diff --git a/README.md b/README.md
index d7d200e..42e1d5f 100755
--- a/README.md
+++ b/README.md
@@ -20,12 +20,48 @@ mvn clean install -Pcps-temporal-docker -Ddocker.repository.push=
## Running via Docker Compose
`docker-compose.yml` file is provided to be run with `docker-compose` tool and local image previously built.
-It starts both Postgres Timescale database and CPS Temporal service.
+It starts following services:
-Execute following command from project root folder:
+* CPS Temporal service (cps-temporal)
+* Postgres Timescale database (timescaledb)
+* Kafka broker (zookeeper and kafka)
+
+Execute following command from project root folder to start all services:
+
+```bash
+docker-compose up
+```
+
+Then, use `kafkacat` tool to produce a data updated event into the Kafka topic:
+
+```bash
+docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.cfg-state-events -D/ -P <<EOF
+{
+ "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
+ "id": "38aa6cc6-264d-4ede-b534-18f5c1f403ea",
+ "source": "urn:cps:org.onap.cps",
+ "type": "org.onap.cps.data-updated-event",
+ "content": {
+ "observedTimestamp": "2021-06-09T13:00:00.123-0400",
+ "dataspaceName": "my-dataspace",
+ "schemaSetName": "my-schema-set",
+ "anchorName": "my-anchor",
+ "data": {
+ "interface": {
+ "name": "itf-1",
+ "status": "up"
+ }
+ }
+ }
+}
+EOF
+```
+
+Finally, verify that CPS Temporal data is persisted as expected:
```bash
-docker-compose up -d
+psql -h localhost -p 5433 -d cpstemporaldb -U cpstemporal -c \
+ "select * from network_data order by created_timestamp desc limit 1"
```
## Alternative local db setup
diff --git a/docker-compose.yml b/docker-compose.yml
index fae1cbc..fe863fd 100755
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,5 +1,6 @@
# ============LICENSE_START=======================================================
# Copyright (C) 2021 Nordix Foundation.
+# Modifications Copyright (C) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -28,16 +29,39 @@ services:
DB_PORT: 5432
DB_USERNAME: cpstemporal
DB_PASSWORD: cpstemporal
+ KAFKA_BOOTSTRAP_SERVER: kafka:9092
+
restart: unless-stopped
depends_on:
- timescaledb
+ - kafka
timescaledb:
container_name: timescaledb
image: timescale/timescaledb:2.1.1-pg13
ports:
- - '5432:5432'
+ - '5433:5432'
environment:
POSTGRES_DB: cpstemporaldb
- POSTGRES_USER: ${DB_USERNAME}
- POSTGRES_PASSWORD: ${DB_PASSWORD}
+ POSTGRES_USER: cpstemporal
+ POSTGRES_PASSWORD: cpstemporal
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.1.1
+ container_name: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:6.1.1
+ container_name: kafka
+ ports:
+ - "19092:19092"
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
diff --git a/pom.xml b/pom.xml
index c38b565..7f2318b 100755
--- a/pom.xml
+++ b/pom.xml
@@ -45,26 +45,29 @@
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format>
<minimum-coverage>0.8</minimum-coverage>
<!-- Application dependencies versions -->
- <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version>
+ <cps.events.version>1.1.0-SNAPSHOT</cps.events.version>
<hibernate-types.version>2.10.0</hibernate-types.version>
<liquibase-core.version>4.3.2</liquibase-core.version>
+ <lombok.version>1.18.20</lombok.version>
+ <mapstruct.version>1.4.2.Final</mapstruct.version>
+ <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version>
<!-- Tests dependencies versions -->
- <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version>
+ <archunit-junit5.version>0.18.0</archunit-junit5.version>
<groovy.version>3.0.7</groovy.version>
<junit-jupiter.version>1.15.2</junit-jupiter.version>
+ <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version>
<testcontainers-postgresql.version>1.15.2</testcontainers-postgresql.version>
- <archunit-junit5.version>0.18.0</archunit-junit5.version>
<!-- Plugins and plugins dependencies versions -->
- <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version>
+ <bug-pattern.version>1.5.0</bug-pattern.version>
+ <cps.checkstyle.version>1.0.1</cps.checkstyle.version>
+ <cps.spotbugs.version>1.0.1</cps.spotbugs.version>
<gmavenplus-plugin.version>1.12.1</gmavenplus-plugin.version>
<jib-maven-plugin.version>3.0.0</jib-maven-plugin.version>
<oparent.version>3.2.0</oparent.version>
- <cps.checkstyle.version>1.0.1</cps.checkstyle.version>
- <cps.spotbugs.version>1.0.1</cps.spotbugs.version>
<spotbugs-maven-plugin.version>4.1.3</spotbugs-maven-plugin.version>
- <spotbugs.version>4.2.0</spotbugs.version>
<spotbugs.slf4j.version>1.8.0-beta4</spotbugs.slf4j.version>
- <bug-pattern.version>1.5.0</bug-pattern.version>
+ <spotbugs.version>4.2.0</spotbugs.version>
+ <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
@@ -117,6 +120,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mapstruct</groupId>
+ <artifactId>mapstruct</artifactId>
+ <version>${mapstruct.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.cps</groupId>
+ <artifactId>cps-events</artifactId>
+ <version>${cps.events.version}</version>
+ </dependency>
<!-- Runtime dependencies-->
<dependency>
<groupId>org.postgresql</groupId>
@@ -163,6 +180,17 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>1.15.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit-junit5</artifactId>
<version>${archunit-junit5.version}</version>
@@ -173,6 +201,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ </path>
+ <path>
+ <groupId>org.mapstruct</groupId>
+ <artifactId>mapstruct-processor</artifactId>
+ <version>${mapstruct.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-maven-plugin.version}</version>
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java
new file mode 100644
index 0000000..a9d1ce2
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+/**
+ * Class representing a listener exception related to system event error.
+ */
+public class EventListenerException extends RuntimeException {
+
+ public EventListenerException(final String message) {
+ super(message);
+ }
+
+ public EventListenerException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java
new file mode 100644
index 0000000..df4e756
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Class representing an invalid event envelop exception.
+ */
+@Getter
+public class InvalidEventEnvelopException extends EventListenerException {
+
+ private final List<InvalidField> invalidFields = new ArrayList<>();
+
+ public InvalidEventEnvelopException(final String message) {
+ super(message);
+ }
+
+ public void addInvalidField(final InvalidField invalidField) {
+ this.invalidFields.add(invalidField);
+ }
+
+ public boolean hasInvalidFields() {
+ return ! this.invalidFields.isEmpty();
+ }
+
+ @Override
+ public String getMessage() {
+ return String.format("%s. invalidFields: %s", super.getMessage(), this.invalidFields.toString());
+ }
+
+ @AllArgsConstructor
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class InvalidField implements Serializable {
+
+ private static final long serialVersionUID = -7118283787669377391L;
+
+ private final ErrorType errorType;
+ private final String fieldName;
+ private final String actualValue;
+ private final String expectedValue;
+
+ public enum ErrorType {
+ UNEXPECTED, MISSING
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
new file mode 100644
index 0000000..79c9d92
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
+import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
+import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
+import org.onap.cps.temporal.service.NetworkDataService;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+/**
+ * Listener for data updated events.
+ */
+@Component
+@Slf4j
+public class DataUpdatedEventListener {
+
+ private static final URI EVENT_SOURCE;
+
+ static {
+ try {
+ EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
+ } catch (final URISyntaxException e) {
+ throw new EventListenerException("Invalid URI for event source.", e);
+ }
+ }
+
+ private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
+
+ private final NetworkDataService networkDataService;
+ private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
+
+ /**
+ * Constructor.
+ */
+ public DataUpdatedEventListener(
+ final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
+ this.networkDataService = networkDataService;
+ this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
+ }
+
+ /**
+ * Consume the specified event.
+ *
+ * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
+ */
+ @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
+ public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+ log.debug("Receiving {} ...", cpsDataUpdatedEvent);
+
+ // Validate event envelop
+ validateEventEnvelop(cpsDataUpdatedEvent);
+
+ // Map event to entity
+ final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
+ log.debug("Persisting {} ...", networkData);
+
+ // Persist entity
+ final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
+ log.debug("Persisted {}", persistedNetworkData);
+
+ }
+
+ private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+ final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure");
+
+ // Schema
+ if (cpsDataUpdatedEvent.getSchema() == null) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING, "schema", null,
+ CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+ .value()));
+ }
+ // Id
+ if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING, "id", null, null));
+ }
+ // Source
+ if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "source",
+ cpsDataUpdatedEvent.getSource() != null
+ ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
+ }
+ // Type
+ if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
+ }
+
+ if (invalidEventEnvelopException.hasInvalidFields()) {
+ throw invalidEventEnvelopException;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java
new file mode 100644
index 0000000..7a4ee7f
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.listener.KafkaListenerErrorHandler;
+import org.springframework.kafka.listener.ListenerExecutionFailedException;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+/**
+ * Class responsible to handle errors for data updated event listener.
+ */
+@Component
+@Slf4j
+class DataUpdatedEventListenerErrorHandler implements KafkaListenerErrorHandler {
+
+ @Override
+ public Object handleError(final Message<?> message, final ListenerExecutionFailedException exception) {
+ log.error(
+ "Failed to process message {}. Error cause is {}.",
+ message,
+ exception.getCause() != null ? exception.getCause().toString() : null,
+ exception);
+ return exception;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
new file mode 100644
index 0000000..9ef25d5
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.event.model.Data;
+import org.onap.cps.temporal.domain.NetworkData;
+
+/**
+ * Mapper for data updated event schema.
+ */
+@Mapper(componentModel = "spring")
+public abstract class CpsDataUpdatedEventMapper {
+
+ private static final DateTimeFormatter ISO_TIMESTAMP_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ @Mapping(source = "content.observedTimestamp", target = "observedTimestamp")
+ @Mapping(source = "content.dataspaceName", target = "dataspace")
+ @Mapping(source = "content.schemaSetName", target = "schemaSet")
+ @Mapping(source = "content.anchorName", target = "anchor")
+ @Mapping(source = "content.data", target = "payload")
+ @Mapping(expression = "java(null)", target = "createdTimestamp")
+ public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent);
+
+ String map(final Data data) throws JsonProcessingException {
+ return data != null ? new ObjectMapper().writeValueAsString(data) : null;
+ }
+
+ OffsetDateTime map(final String timestamp) {
+ return timestamp != null ? OffsetDateTime.parse(timestamp, ISO_TIMESTAMP_FORMATTER) : null;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/QueryController.java b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java
index d083dc9..fae95cc 100644
--- a/src/main/java/org/onap/cps/temporal/controller/QueryController.java
+++ b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java
@@ -16,7 +16,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.temporal.controller;
+package org.onap.cps.temporal.controller.web;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
index aa2ce95..1537e4a 100644
--- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
+++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
@@ -48,27 +48,30 @@ import org.hibernate.annotations.TypeDef;
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
public class NetworkData implements Serializable {
- private static final long serialVersionUID = -8032810412816532433L;
+ private static final long serialVersionUID = 8886477871334560919L;
@Id
+ @NotNull
@Column
private OffsetDateTime observedTimestamp;
@Id
+ @NotNull
@Column
private String dataspace;
@Id
+ @NotNull
@Column
private String anchor;
@NotNull
- @Column
+ @Column(updatable = false)
private String schemaSet;
@NotNull
@Type(type = "jsonb")
- @Column(columnDefinition = "jsonb")
+ @Column(columnDefinition = "jsonb", updatable = false)
private String payload;
@CreationTimestamp
diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
index e9742e2..18c4dcf 100644
--- a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
+++ b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
@@ -32,10 +32,10 @@ import lombok.NoArgsConstructor;
@EqualsAndHashCode
public class NetworkDataId implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -1039604338648260766L;
+ private OffsetDateTime observedTimestamp;
private String dataspace;
private String anchor;
- private OffsetDateTime observedTimestamp;
-} \ No newline at end of file
+}
diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
index 2e7afb2..687ba85 100644
--- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
+++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
@@ -18,24 +18,39 @@
package org.onap.cps.temporal.service;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.temporal.domain.NetworkData;
+import org.onap.cps.temporal.domain.NetworkDataId;
import org.onap.cps.temporal.repository.NetworkDataRepository;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
/**
* Service implementation for Network Data.
*/
-@Component
+@Service
@Slf4j
public class NetworkDataServiceImpl implements NetworkDataService {
- @Autowired
- NetworkDataRepository networkDataRepository;
+ private final NetworkDataRepository networkDataRepository;
+
+ public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository) {
+ this.networkDataRepository = networkDataRepository;
+ }
@Override
public NetworkData addNetworkData(final NetworkData networkData) {
- return networkDataRepository.save(networkData);
+ final var savedNetworkData = networkDataRepository.save(networkData);
+ if (savedNetworkData.getCreatedTimestamp() == null) {
+ // Data already exists and can not be inserted
+ final var id =
+ new NetworkDataId(
+ networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
+ final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id);
+ throw new ServiceException(
+ "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
+ }
+ return savedNetworkData;
}
+
}
diff --git a/src/main/java/org/onap/cps/temporal/service/ServiceException.java b/src/main/java/org/onap/cps/temporal/service/ServiceException.java
new file mode 100644
index 0000000..b9d7184
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/service/ServiceException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.service;
+
+/**
+ * Class representing a service exception related to business error.
+ */
+public class ServiceException extends RuntimeException {
+
+ /**
+ * Instantiate a service exception with the specified message.
+ * @param message the exception message
+ */
+ public ServiceException(final String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/resources/application-sasl-ssl-kafka.yml b/src/main/resources/application-sasl-ssl-kafka.yml
new file mode 100644
index 0000000..fdc6458
--- /dev/null
+++ b/src/main/resources/application-sasl-ssl-kafka.yml
@@ -0,0 +1,31 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 Bell Canada.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+# Spring profile configuration for sasl ssl Kafka
+
+spring:
+ kafka:
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+ security:
+ protocol: SASL_SSL
+ ssl:
+ trust-store-type: JKS
+ trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
+ trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
+ properties:
+ sasl.mechanism: SCRAM-SHA-512
+ sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
+ ssl.endpoint.identification.algorithm:
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index d4c799c..5fe30b0 100755
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -27,10 +27,26 @@ spring:
change-log: classpath:/db/changelog/changelog-master.xml
jpa:
properties:
- hibernate:
- dialect: org.hibernate.dialect.PostgreSQLDialect
+ hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
+ hibernate.format_sql: true
+ hibernate.generate_statistics: false
+ kafka:
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+ security:
+ protocol: PLAINTEXT
+ consumer:
+ group-id: ${KAFKA_CONSUMER_GROUP_ID:cps-temporal-group}
+ # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
+ # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer
+ # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/
+ key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+ spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
-logging:
- level:
- org:
- springframework: INFO
+app:
+ listener:
+ data-updated:
+ topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
new file mode 100644
index 0000000..a75b7aa
--- /dev/null
+++ b/src/main/resources/logback.xml
@@ -0,0 +1,43 @@
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (c) 2021 Bell Canada.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+-->
+
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d - %highlight(%-5level) [%-20.20thread] %cyan(%logger{36}) - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- Logger for cps classes -->
+ <logger name="org.onap.cps" level="info"/>
+
+ <!-- Logger for sql statements. Set to info to disable, debug to enable -->
+ <logger name="org.hibernate.SQL" level="info"/>
+
+ <!-- Logger for sql bindings. Set to info to disable, to trace to enable -->
+ <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="info"/>
+
+ <!-- Logger for hibernate statistics. Set to warn to disable, to info to enable -->
+ <logger name="org.hibernate.engine.internal.StatisticalLoggingSessionEventListener" level="warn"/>
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
new file mode 100644
index 0000000..4c362ad
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+*/
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import groovy.util.logging.Slf4j
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.temporal.repository.containers.TimescaleContainer
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.jdbc.core.JdbcTemplate
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import spock.lang.Shared
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * Integration test specification for data updated event listener.
+ * This integration test is running database and kafka dependencies as docker containers.
+ */
+@SpringBootTest
+@Slf4j
+class DataUpdatedEventListenerIntegrationSpec extends Specification {
+
+ @Shared
+ def databaseTestContainer = TimescaleContainer.getInstance()
+
+ static kafkaTestContainer = new KafkaContainer()
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+ }
+
+ def setupSpec() {
+ databaseTestContainer.start()
+ kafkaTestContainer.start()
+ }
+
+ @Autowired
+ KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate
+
+ @Autowired
+ JdbcTemplate jdbcTemplate
+
+ @Value('${app.listener.data-updated.topic}')
+ String topic
+
+ // Define event data
+ def aTimestamp = EventFixtures.currentIsoTimestamp()
+ def aDataspace = 'my-dataspace'
+ def aSchemaSet = 'my-schema-set'
+ def anAnchor = 'my-anchor'
+
+ // Define sql queries for data validation
+ def sqlCount = "select count(*) from network_data"
+ def sqlSelect = "select * from network_data"
+ def sqlWhereClause =
+ ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
+ 'and dataspace = ? ' +
+ 'and schema_set = ? ' +
+ 'and anchor = ?'
+ def sqlCountWithConditions = sqlCount + sqlWhereClause
+ def sqlSelectWithConditions = sqlSelect + sqlWhereClause
+
+ def 'Processing a valid event'() {
+ given: "no event has been proceeded"
+ def initialRecordsCount =
+ jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
+ aTimestamp, aDataspace, aSchemaSet, anAnchor)
+ assert (initialRecordsCount == 0)
+ when: 'an event is produced'
+ def event =
+ EventFixtures.buildEvent(
+ timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
+ this.kafkaTemplate.send(topic, event)
+ then: 'the event is proceeded'
+ def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
+ pollingCondition.eventually {
+ def finalRecordsCount =
+ jdbcTemplate.queryForObject(
+ sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
+ assert (finalRecordsCount == 1)
+ }
+ Map<String, Object> result =
+ jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
+ log.debug("Data retrieved from db: {}", result)
+ }
+
+ def 'Processing an invalid event'() {
+ given: 'the number of network data records if known'
+ def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
+ when: 'an invalid event is produced'
+ this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
+ then: 'the event is not proceeded and no more network data record is created'
+ TimeUnit.SECONDS.sleep(3)
+ assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
+ }
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
+ }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
new file mode 100644
index 0000000..d3a407c
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
@@ -0,0 +1,117 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException
+import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper
+import org.onap.cps.temporal.service.NetworkDataService
+import spock.lang.Specification
+
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED
+
+/**
+ * Test specification for data updated event listener.
+ */
+class DataUpdatedEventListenerSpec extends Specification {
+
+ // Define event data
+ def anEventType = 'my-event-type'
+ def anEventSource = new URI('my-event-source')
+ def aTimestamp = EventFixtures.currentIsoTimestamp()
+ def aDataspace = 'my-dataspace'
+ def aSchemaSet = 'my-schema-set'
+ def anAnchor = 'my-anchor'
+ def aDataName = 'my-data-name'
+ def aDataValue = 'my-data-value'
+
+ // Define service mock
+ def mockService = Mock(NetworkDataService)
+
+ // Define mapper
+ def mapper = Mappers.getMapper(CpsDataUpdatedEventMapper.class)
+
+ // Define listener under test
+ def objectUnderTest = new DataUpdatedEventListener(mockService, mapper)
+
+ def 'Event message consumption'() {
+ when: 'an event is received'
+ def event =
+ EventFixtures.buildEvent(
+ timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor,
+ dataName: aDataName, dataValue: aDataValue)
+ objectUnderTest.consume(event)
+ then: 'network data service is requested to persisted the data change'
+ 1 * mockService.addNetworkData(
+ {
+ it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
+ && it.getDataspace() == aDataspace
+ && it.getSchemaSet() == aSchemaSet
+ && it.getAnchor() == anAnchor
+ && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue)
+ && it.getCreatedTimestamp() == null
+ }
+ )
+ }
+
+ def 'Event message consumption fails because of missing envelop'() {
+ when: 'an event without envelop information is received'
+ def invalidEvent = new CpsDataUpdatedEvent().withSchema(null)
+ objectUnderTest.consume(invalidEvent)
+ then: 'an exception is thrown with 4 invalid fields'
+ def e = thrown(InvalidEventEnvelopException)
+ e.getInvalidFields().size() == 4
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING,"schema", null,
+ CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+ .value()))
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING, "id", null, null))
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString()))
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "type", null, EventFixtures.defaultEventType))
+ e.getMessage().contains(e.getInvalidFields().toString())
+ }
+
+ def 'Event message consumption fails because of invalid envelop'() {
+ when: 'an event with an invalid envelop is received'
+ def invalidEvent =
+ new CpsDataUpdatedEvent()
+ .withId('my-id').withSource(anEventSource).withType(anEventType)
+ objectUnderTest.consume(invalidEvent)
+ then: 'an exception is thrown with 2 invalid fields'
+ def e = thrown(InvalidEventEnvelopException)
+ e.getInvalidFields().size() == 2
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType))
+ e.getInvalidFields().contains(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "source", anEventSource.toString(),
+ EventFixtures.defaultEventSource.toString()))
+ }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
new file mode 100644
index 0000000..44a28de
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import org.onap.cps.event.model.Content
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.event.model.Data
+
+import java.time.OffsetDateTime
+import java.time.format.DateTimeFormatter
+
+/**
+ * This class contains utility fixtures methods for building and manipulating event data.
+ */
+class EventFixtures {
+
+ static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
+ static String defaultEventType = 'org.onap.cps.data-updated-event'
+ static URI defaultEventSource = new URI('urn:cps:org.onap.cps')
+
+ static CpsDataUpdatedEvent buildEvent(final Map map) {
+ CpsDataUpdatedEvent event =
+ new CpsDataUpdatedEvent()
+ .withId(
+ map.id != null ? map.id.toString() : UUID.randomUUID().toString())
+ .withType(
+ map.eventType != null ? map.eventType.toString() : defaultEventType)
+ .withSource(
+ map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource)
+ .withContent(
+ new Content()
+ .withObservedTimestamp(
+ map.timestamp != null ? map.timestamp.toString() : currentTimestamp())
+ .withDataspaceName(
+ map.dataspace != null ? map.dataspace.toString() : 'a-dataspace')
+ .withSchemaSetName(
+ map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set')
+ .withAnchorName(
+ map.anchor != null ? map.anchor.toString() : 'an-anchor')
+ .withData(
+ new Data().withAdditionalProperty(
+ map.dataName != null ? map.dataName.toString() : 'a-data-name',
+ map.dataValue != null ? map.dataValue : 'a-data-value')))
+
+ return event
+ }
+
+ static String currentIsoTimestamp() {
+ return isoTimestampFormatter.format(OffsetDateTime.now())
+ }
+
+ static OffsetDateTime toOffsetDateTime(String timestamp) {
+ return OffsetDateTime.parse(timestamp, isoTimestampFormatter)
+ }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
new file mode 100644
index 0000000..132ff6d
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
@@ -0,0 +1,131 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.model
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.Content
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.event.model.Data
+import org.onap.cps.temporal.domain.NetworkData
+import spock.lang.Specification
+
+import java.time.OffsetDateTime
+import java.time.format.DateTimeFormatter
+
+/**
+ * Test specification for data updated event mapper.
+ */
+class CpsDataUpdatedEventMapperSpec extends Specification {
+
+ def objectUnderTest = Mappers.getMapper(CpsDataUpdatedEventMapper.class);
+
+ def 'Mapping a null event'() {
+ given: 'a null event'
+ def event = null
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'the result entity is null'
+ result == null
+ }
+
+ def 'Mapping an event whose properties are null'() {
+ given: 'an event whose properties are null'
+ def event = new CpsDataUpdatedEvent()
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'the result entity is not null'
+ result != null
+ and: 'all result entity properties are null'
+ assertEntityPropertiesAreNull(result)
+ }
+
+ def 'Mapping an event whose content properties are null'() {
+ given: 'an event whose content properties are null'
+ def event = new CpsDataUpdatedEvent().withContent(new Content())
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'the result entity is not null'
+ result != null
+ and: 'all result entity properties are null'
+ assertEntityPropertiesAreNull(result)
+ }
+
+ def 'Mapping an event whose content data is empty'() {
+ given: 'an event whose content data is empty'
+ def event = new CpsDataUpdatedEvent().withContent(new Content().withData(new Data()))
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'the result entity is not null'
+ result != null
+ and: 'the result entity payload is an empty json '
+ result.getPayload() == "{}"
+ }
+
+ def 'Mapping an event whose content data is invalid'() {
+ given: 'an event whose content data is invalid'
+ def event =
+ new CpsDataUpdatedEvent().withContent(new Content().withData(
+ new Data().withAdditionalProperty(null, null)))
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'an runtime exception is thrown'
+ def e = thrown(RuntimeException)
+ e.getCause() instanceof JsonProcessingException
+ }
+
+ def 'Mapping a valid complete event'() {
+ given: 'a valid complete event'
+ def isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
+ def aDataName = 'a-data-name'
+ def aDataValue = 'a-data-value'
+ def event =
+ new CpsDataUpdatedEvent()
+ .withContent(
+ new Content()
+ .withObservedTimestamp(isoTimestampFormatter.format(OffsetDateTime.now()))
+ .withDataspaceName('a-dataspace')
+ .withSchemaSetName('a-schema-set')
+ .withAnchorName('an-anchor')
+ .withData(new Data().withAdditionalProperty(aDataName, aDataValue)))
+ when: 'the event is mapped to an entity'
+ NetworkData result = objectUnderTest.eventToEntity(event)
+ then: 'the result entity is not null'
+ result != null
+ and: 'all result entity properties are the ones from the event'
+ result.getObservedTimestamp() ==
+ OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
+ result.getDataspace() == event.getContent().getDataspaceName()
+ result.getSchemaSet() == event.getContent().getSchemaSetName()
+ result.getAnchor() == event.getContent().getAnchorName()
+ result.getPayload().contains(aDataValue)
+ result.getPayload().contains(aDataValue)
+ result.getCreatedTimestamp() == null
+ }
+
+ private void assertEntityPropertiesAreNull(NetworkData networkData) {
+ assert networkData.getObservedTimestamp() == null
+ assert networkData.getDataspace() == null
+ assert networkData.getSchemaSet() == null
+ assert networkData.getAnchor() == null
+ assert networkData.getPayload() == null
+ assert networkData.getCreatedTimestamp() == null
+ }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy
index f718bf4..c834f38 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy
@@ -16,7 +16,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.temporal.controller
+package org.onap.cps.temporal.controller.web
import spock.lang.Specification
@@ -34,4 +34,4 @@ class QueryControllerSpec extends Specification {
! response.empty
}
-} \ No newline at end of file
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
index ec976ee..f66b35e 100644
--- a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
@@ -29,6 +29,9 @@ import spock.lang.Specification
import java.time.OffsetDateTime
+/**
+ * Test specification for network data repository.
+ */
@SpringBootTest
@Testcontainers
class NetworkDataRepositorySpec extends Specification {
diff --git a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
index 70ac2bc..9847f54 100644
--- a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
@@ -18,28 +18,51 @@
package org.onap.cps.temporal.service
+import org.onap.cps.temporal.domain.NetworkDataId
+
import java.time.OffsetDateTime
import org.onap.cps.temporal.domain.NetworkData
import org.onap.cps.temporal.repository.NetworkDataRepository
import spock.lang.Specification
+/**
+ * Test specification for network data service.
+ */
class NetworkDataServiceImplSpec extends Specification {
- def objectUnderTest = new NetworkDataServiceImpl()
-
def mockNetworkDataRepository = Mock(NetworkDataRepository)
+ def objectUnderTest = new NetworkDataServiceImpl(mockNetworkDataRepository)
+
def networkData = new NetworkData()
- def setup() {
- objectUnderTest.networkDataRepository = mockNetworkDataRepository
+ def 'Add network data successfully.'() {
+ given: 'network data repository is persisting network data it is asked to save'
+ def persistedNetworkData = new NetworkData()
+ persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now())
+ mockNetworkDataRepository.save(networkData) >> persistedNetworkData
+ when: 'a new network data is added'
+ def result = objectUnderTest.addNetworkData(networkData)
+ then: 'result network data is the one that has been persisted'
+ result == persistedNetworkData
+ result.getCreatedTimestamp() != null
+ networkData.getCreatedTimestamp() == null
}
- def 'Add network data in timeseries database.'() {
+ def 'Add network data fails because already added'() {
+ given: 'network data repository is not able to create data it is asked to persist ' +
+ 'and reveals it with null created timestamp on network data entity'
+ def persistedNetworkData = new NetworkData()
+ persistedNetworkData.setCreatedTimestamp(null)
+ mockNetworkDataRepository.save(networkData) >> persistedNetworkData
+ and: 'existing data can be retrieved'
+ def existing = new NetworkData()
+ existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1))
+ mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing)
when: 'a new network data is added'
objectUnderTest.addNetworkData(networkData)
- then: ' repository service is called with the correct parameters'
- 1 * mockNetworkDataRepository.save(networkData)
+ then: 'network service exception is thrown'
+ thrown(ServiceException)
}
}
diff --git a/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java b/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java
index d47e8a5..a70e914 100644
--- a/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java
+++ b/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java
@@ -70,4 +70,4 @@ public class LayeredArchitectureTest {
.should().onlyHaveDependentClassesThat()
.resideInAnyPackage(SERVICE_PACKAGE, REPOSITORY_PACKAGE);
-} \ No newline at end of file
+}
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index afaff6c..3ac13a9 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -20,17 +20,37 @@ server:
spring:
datasource:
url: ${DB_URL}
- password: ${DB_PASSWORD}
username: ${DB_USERNAME}
+ password: ${DB_PASSWORD}
liquibase:
change-log: classpath:/db/changelog/changelog-master.xml
jpa:
- open-in-view: false
properties:
- hibernate:
- dialect: org.hibernate.dialect.PostgreSQLDialect
+ hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
+ hibernate.format_sql: true
+ hibernate.generate_statistics: false
+ kafka:
+ bootstrap-servers: localhost:9092
+ security:
+ protocol: PLAINTEXT
+ consumer:
+ group-id: cps-temporal-group
+ # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
+ # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer
+ # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/
+ key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ auto-offset-reset: earliest
+ properties:
+ spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+ spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
+ # Following is not cps-temporal configuration. It is configuration for the producer used for integration tests
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
-logging:
- level:
- org:
- springframework: INFO
+app:
+ listener:
+ data-updated:
+ topic: cps.cfg-state-events