diff options
author | Bruno Sakoto <bruno.sakoto@bell.ca> | 2021-06-04 07:49:14 -0400 |
---|---|---|
committer | Bruno Sakoto <bruno.sakoto@bell.ca> | 2021-07-05 18:37:15 -0400 |
commit | c9b99347c7c425fce7a1f5f3c7e2ac500f2f0c5c (patch) | |
tree | 199b49d2db987f2c630de124356a48b1bd97657a | |
parent | 8b2193b7ed06e2ee6a90f7986921e72ca70ad90f (diff) |
Add kafka listener for data updated events
See "Running via Docker Compose" section from README.md file to have an
example of event processing
Issue-ID: CPS-371
Signed-off-by: Bruno Sakoto <bruno.sakoto@bell.ca>
Change-Id: Id3abfa32fb04e07102a5f28e6e43a9b533391188
25 files changed, 1126 insertions, 51 deletions
@@ -20,12 +20,48 @@ mvn clean install -Pcps-temporal-docker -Ddocker.repository.push= ## Running via Docker Compose `docker-compose.yml` file is provided to be run with `docker-compose` tool and local image previously built. -It starts both Postgres Timescale database and CPS Temporal service. +It starts following services: -Execute following command from project root folder: +* CPS Temporal service (cps-temporal) +* Postgres Timescale database (timescaledb) +* Kafka broker (zookeeper and kafka) + +Execute following command from project root folder to start all services: + +```bash +docker-compose up +``` + +Then, use `kafkacat` tool to produce a data updated event into the Kafka topic: + +```bash +docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.cfg-state-events -D/ -P <<EOF +{ + "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT", + "id": "38aa6cc6-264d-4ede-b534-18f5c1f403ea", + "source": "urn:cps:org.onap.cps", + "type": "org.onap.cps.data-updated-event", + "content": { + "observedTimestamp": "2021-06-09T13:00:00.123-0400", + "dataspaceName": "my-dataspace", + "schemaSetName": "my-schema-set", + "anchorName": "my-anchor", + "data": { + "interface": { + "name": "itf-1", + "status": "up" + } + } + } +} +EOF +``` + +Finally, verify that CPS Temporal data is persisted as expected: ```bash -docker-compose up -d +psql -h localhost -p 5433 -d cpstemporaldb -U cpstemporal -c \ + "select * from network_data order by created_timestamp desc limit 1" ``` ## Alternative local db setup diff --git a/docker-compose.yml b/docker-compose.yml index fae1cbc..fe863fd 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,6 @@ # ============LICENSE_START======================================================= # Copyright (C) 2021 Nordix Foundation. +# Modifications Copyright (C) 2021 Bell Canada. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,16 +29,39 @@ services: DB_PORT: 5432 DB_USERNAME: cpstemporal DB_PASSWORD: cpstemporal + KAFKA_BOOTSTRAP_SERVER: kafka:9092 + restart: unless-stopped depends_on: - timescaledb + - kafka timescaledb: container_name: timescaledb image: timescale/timescaledb:2.1.1-pg13 ports: - - '5432:5432' + - '5433:5432' environment: POSTGRES_DB: cpstemporaldb - POSTGRES_USER: ${DB_USERNAME} - POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_USER: cpstemporal + POSTGRES_PASSWORD: cpstemporal + + zookeeper: + image: confluentinc/cp-zookeeper:6.1.1 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:6.1.1 + container_name: kafka + ports: + - "19092:19092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 @@ -45,26 +45,29 @@ <maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format> <minimum-coverage>0.8</minimum-coverage> <!-- Application dependencies versions --> - <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version> + <cps.events.version>1.1.0-SNAPSHOT</cps.events.version> <hibernate-types.version>2.10.0</hibernate-types.version> <liquibase-core.version>4.3.2</liquibase-core.version> + <lombok.version>1.18.20</lombok.version> + <mapstruct.version>1.4.2.Final</mapstruct.version> + <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version> <!-- Tests dependencies versions --> - <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version> + <archunit-junit5.version>0.18.0</archunit-junit5.version> <groovy.version>3.0.7</groovy.version> <junit-jupiter.version>1.15.2</junit-jupiter.version> + <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version> <testcontainers-postgresql.version>1.15.2</testcontainers-postgresql.version> - <archunit-junit5.version>0.18.0</archunit-junit5.version> <!-- Plugins and plugins dependencies versions --> - <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version> + <bug-pattern.version>1.5.0</bug-pattern.version> + <cps.checkstyle.version>1.0.1</cps.checkstyle.version> + <cps.spotbugs.version>1.0.1</cps.spotbugs.version> <gmavenplus-plugin.version>1.12.1</gmavenplus-plugin.version> <jib-maven-plugin.version>3.0.0</jib-maven-plugin.version> <oparent.version>3.2.0</oparent.version> - <cps.checkstyle.version>1.0.1</cps.checkstyle.version> - <cps.spotbugs.version>1.0.1</cps.spotbugs.version> <spotbugs-maven-plugin.version>4.1.3</spotbugs-maven-plugin.version> - <spotbugs.version>4.2.0</spotbugs.version> <spotbugs.slf4j.version>1.8.0-beta4</spotbugs.slf4j.version> - <bug-pattern.version>1.5.0</bug-pattern.version> + <spotbugs.version>4.2.0</spotbugs.version> + <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version> </properties> <dependencyManagement> @@ -117,6 +120,20 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> + <dependency> + <groupId>org.mapstruct</groupId> + <artifactId>mapstruct</artifactId> + <version>${mapstruct.version}</version> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.onap.cps</groupId> + <artifactId>cps-events</artifactId> + <version>${cps.events.version}</version> + </dependency> <!-- Runtime dependencies--> <dependency> <groupId>org.postgresql</groupId> @@ -163,6 +180,17 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>1.15.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.tngtech.archunit</groupId> <artifactId>archunit-junit5</artifactId> <version>${archunit-junit5.version}</version> @@ -173,6 +201,25 @@ <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + <configuration> + <annotationProcessorPaths> + <path> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + </path> + <path> + <groupId>org.mapstruct</groupId> + <artifactId>mapstruct-processor</artifactId> + <version>${mapstruct.version}</version> + </path> + </annotationProcessorPaths> + </configuration> + </plugin> + <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring-boot-maven-plugin.version}</version> diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java new file mode 100644 index 0000000..a9d1ce2 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.exception; + +/** + * Class representing a listener exception related to system event error. + */ +public class EventListenerException extends RuntimeException { + + public EventListenerException(final String message) { + super(message); + } + + public EventListenerException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java new file mode 100644 index 0000000..df4e756 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.exception; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** + * Class representing an invalid event envelop exception. + */ +@Getter +public class InvalidEventEnvelopException extends EventListenerException { + + private final List<InvalidField> invalidFields = new ArrayList<>(); + + public InvalidEventEnvelopException(final String message) { + super(message); + } + + public void addInvalidField(final InvalidField invalidField) { + this.invalidFields.add(invalidField); + } + + public boolean hasInvalidFields() { + return ! this.invalidFields.isEmpty(); + } + + @Override + public String getMessage() { + return String.format("%s. invalidFields: %s", super.getMessage(), this.invalidFields.toString()); + } + + @AllArgsConstructor + @Getter + @EqualsAndHashCode + @ToString + public static class InvalidField implements Serializable { + + private static final long serialVersionUID = -7118283787669377391L; + + private final ErrorType errorType; + private final String fieldName; + private final String actualValue; + private final String expectedValue; + + public enum ErrorType { + UNEXPECTED, MISSING + } + + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java new file mode 100644 index 0000000..79c9d92 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java @@ -0,0 +1,129 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka; + +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING; +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED; + +import java.net.URI; +import java.net.URISyntaxException; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.event.model.CpsDataUpdatedEvent; +import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException; +import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException; +import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper; +import org.onap.cps.temporal.service.NetworkDataService; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +/** + * Listener for data updated events. + */ +@Component +@Slf4j +public class DataUpdatedEventListener { + + private static final URI EVENT_SOURCE; + + static { + try { + EVENT_SOURCE = new URI("urn:cps:org.onap.cps"); + } catch (final URISyntaxException e) { + throw new EventListenerException("Invalid URI for event source.", e); + } + } + + private static final String EVENT_TYPE = "org.onap.cps.data-updated-event"; + + private final NetworkDataService networkDataService; + private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper; + + /** + * Constructor. + */ + public DataUpdatedEventListener( + final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) { + this.networkDataService = networkDataService; + this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper; + } + + /** + * Consume the specified event. + * + * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted. + */ + @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler") + public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + + log.debug("Receiving {} ...", cpsDataUpdatedEvent); + + // Validate event envelop + validateEventEnvelop(cpsDataUpdatedEvent); + + // Map event to entity + final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent); + log.debug("Persisting {} ...", networkData); + + // Persist entity + final var persistedNetworkData = this.networkDataService.addNetworkData(networkData); + log.debug("Persisted {}", persistedNetworkData); + + } + + private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + + final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure"); + + // Schema + if (cpsDataUpdatedEvent.getSchema() == null) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + MISSING, "schema", null, + CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT + .value())); + } + // Id + if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + MISSING, "id", null, null)); + } + // Source + if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "source", + cpsDataUpdatedEvent.getSource() != null + ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString())); + } + // Type + if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE)); + } + + if (invalidEventEnvelopException.hasInvalidFields()) { + throw invalidEventEnvelopException; + } + + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java new file mode 100644 index 0000000..7a4ee7f --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * Class responsible to handle errors for data updated event listener. + */ +@Component +@Slf4j +class DataUpdatedEventListenerErrorHandler implements KafkaListenerErrorHandler { + + @Override + public Object handleError(final Message<?> message, final ListenerExecutionFailedException exception) { + log.error( + "Failed to process message {}. Error cause is {}.", + message, + exception.getCause() != null ? exception.getCause().toString() : null, + exception); + return exception; + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java new file mode 100644 index 0000000..9ef25d5 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.onap.cps.event.model.CpsDataUpdatedEvent; +import org.onap.cps.event.model.Data; +import org.onap.cps.temporal.domain.NetworkData; + +/** + * Mapper for data updated event schema. + */ +@Mapper(componentModel = "spring") +public abstract class CpsDataUpdatedEventMapper { + + private static final DateTimeFormatter ISO_TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + @Mapping(source = "content.observedTimestamp", target = "observedTimestamp") + @Mapping(source = "content.dataspaceName", target = "dataspace") + @Mapping(source = "content.schemaSetName", target = "schemaSet") + @Mapping(source = "content.anchorName", target = "anchor") + @Mapping(source = "content.data", target = "payload") + @Mapping(expression = "java(null)", target = "createdTimestamp") + public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent); + + String map(final Data data) throws JsonProcessingException { + return data != null ? new ObjectMapper().writeValueAsString(data) : null; + } + + OffsetDateTime map(final String timestamp) { + return timestamp != null ? OffsetDateTime.parse(timestamp, ISO_TIMESTAMP_FORMATTER) : null; + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/QueryController.java b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java index d083dc9..fae95cc 100644 --- a/src/main/java/org/onap/cps/temporal/controller/QueryController.java +++ b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java @@ -16,7 +16,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.temporal.controller; +package org.onap.cps.temporal.controller.web; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java index aa2ce95..1537e4a 100644 --- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java +++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java @@ -48,27 +48,30 @@ import org.hibernate.annotations.TypeDef; @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class) public class NetworkData implements Serializable { - private static final long serialVersionUID = -8032810412816532433L; + private static final long serialVersionUID = 8886477871334560919L; @Id + @NotNull @Column private OffsetDateTime observedTimestamp; @Id + @NotNull @Column private String dataspace; @Id + @NotNull @Column private String anchor; @NotNull - @Column + @Column(updatable = false) private String schemaSet; @NotNull @Type(type = "jsonb") - @Column(columnDefinition = "jsonb") + @Column(columnDefinition = "jsonb", updatable = false) private String payload; @CreationTimestamp diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java index e9742e2..18c4dcf 100644 --- a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java +++ b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java @@ -32,10 +32,10 @@ import lombok.NoArgsConstructor; @EqualsAndHashCode public class NetworkDataId implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -1039604338648260766L; + private OffsetDateTime observedTimestamp; private String dataspace; private String anchor; - private OffsetDateTime observedTimestamp; -}
\ No newline at end of file +} diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java index 2e7afb2..687ba85 100644 --- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java +++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java @@ -18,24 +18,39 @@ package org.onap.cps.temporal.service; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.onap.cps.temporal.domain.NetworkData; +import org.onap.cps.temporal.domain.NetworkDataId; import org.onap.cps.temporal.repository.NetworkDataRepository; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; /** * Service implementation for Network Data. */ -@Component +@Service @Slf4j public class NetworkDataServiceImpl implements NetworkDataService { - @Autowired - NetworkDataRepository networkDataRepository; + private final NetworkDataRepository networkDataRepository; + + public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository) { + this.networkDataRepository = networkDataRepository; + } @Override public NetworkData addNetworkData(final NetworkData networkData) { - return networkDataRepository.save(networkData); + final var savedNetworkData = networkDataRepository.save(networkData); + if (savedNetworkData.getCreatedTimestamp() == null) { + // Data already exists and can not be inserted + final var id = + new NetworkDataId( + networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor()); + final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id); + throw new ServiceException( + "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null))); + } + return savedNetworkData; } + } diff --git a/src/main/java/org/onap/cps/temporal/service/ServiceException.java b/src/main/java/org/onap/cps/temporal/service/ServiceException.java new file mode 100644 index 0000000..b9d7184 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/service/ServiceException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.service; + +/** + * Class representing a service exception related to business error. + */ +public class ServiceException extends RuntimeException { + + /** + * Instantiate a service exception with the specified message. + * @param message the exception message + */ + public ServiceException(final String message) { + super(message); + } + +} diff --git a/src/main/resources/application-sasl-ssl-kafka.yml b/src/main/resources/application-sasl-ssl-kafka.yml new file mode 100644 index 0000000..fdc6458 --- /dev/null +++ b/src/main/resources/application-sasl-ssl-kafka.yml @@ -0,0 +1,31 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 Bell Canada. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +# Spring profile configuration for sasl ssl Kafka + +spring: + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + security: + protocol: SASL_SSL + ssl: + trust-store-type: JKS + trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION} + trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD} + properties: + sasl.mechanism: SCRAM-SHA-512 + sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG} + ssl.endpoint.identification.algorithm: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d4c799c..5fe30b0 100755 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -27,10 +27,26 @@ spring: change-log: classpath:/db/changelog/changelog-master.xml jpa: properties: - hibernate: - dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.format_sql: true + hibernate.generate_statistics: false + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + security: + protocol: PLAINTEXT + consumer: + group-id: ${KAFKA_CONSUMER_GROUP_ID:cps-temporal-group} + # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers + # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer + # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/ + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent -logging: - level: - org: - springframework: INFO +app: + listener: + data-updated: + topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..a75b7aa --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,43 @@ +<!-- + ============LICENSE_START======================================================= + Copyright (c) 2021 Bell Canada. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= +--> + +<configuration> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d - %highlight(%-5level) [%-20.20thread] %cyan(%logger{36}) - %msg%n</pattern> + </encoder> + </appender> + + <!-- Logger for cps classes --> + <logger name="org.onap.cps" level="info"/> + + <!-- Logger for sql statements. Set to info to disable, debug to enable --> + <logger name="org.hibernate.SQL" level="info"/> + + <!-- Logger for sql bindings. Set to info to disable, to trace to enable --> + <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="info"/> + + <!-- Logger for hibernate statistics. Set to warn to disable, to info to enable --> + <logger name="org.hibernate.engine.internal.StatisticalLoggingSessionEventListener" level="warn"/> + + <root level="info"> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy new file mode 100644 index 0000000..4c362ad --- /dev/null +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy @@ -0,0 +1,124 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= +*/ + +package org.onap.cps.temporal.controller.event.listener.kafka + +import groovy.util.logging.Slf4j +import org.onap.cps.event.model.CpsDataUpdatedEvent +import org.onap.cps.temporal.repository.containers.TimescaleContainer +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.jdbc.core.JdbcTemplate +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.KafkaContainer +import spock.lang.Shared +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.util.concurrent.TimeUnit + +/** + * Integration test specification for data updated event listener. + * This integration test is running database and kafka dependencies as docker containers. + */ +@SpringBootTest +@Slf4j +class DataUpdatedEventListenerIntegrationSpec extends Specification { + + @Shared + def databaseTestContainer = TimescaleContainer.getInstance() + + static kafkaTestContainer = new KafkaContainer() + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + databaseTestContainer.start() + kafkaTestContainer.start() + } + + @Autowired + KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate + + @Autowired + JdbcTemplate jdbcTemplate + + @Value('${app.listener.data-updated.topic}') + String topic + + // Define event data + def aTimestamp = EventFixtures.currentIsoTimestamp() + def aDataspace = 'my-dataspace' + def aSchemaSet = 'my-schema-set' + def anAnchor = 'my-anchor' + + // Define sql queries for data validation + def sqlCount = "select count(*) from network_data" + def sqlSelect = "select * from network_data" + def sqlWhereClause = + ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' + + 'and dataspace = ? ' + + 'and schema_set = ? ' + + 'and anchor = ?' + def sqlCountWithConditions = sqlCount + sqlWhereClause + def sqlSelectWithConditions = sqlSelect + sqlWhereClause + + def 'Processing a valid event'() { + given: "no event has been proceeded" + def initialRecordsCount = + jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class, + aTimestamp, aDataspace, aSchemaSet, anAnchor) + assert (initialRecordsCount == 0) + when: 'an event is produced' + def event = + EventFixtures.buildEvent( + timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor) + this.kafkaTemplate.send(topic, event) + then: 'the event is proceeded' + def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2) + pollingCondition.eventually { + def finalRecordsCount = + jdbcTemplate.queryForObject( + sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor) + assert (finalRecordsCount == 1) + } + Map<String, Object> result = + jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor) + log.debug("Data retrieved from db: {}", result) + } + + def 'Processing an invalid event'() { + given: 'the number of network data records if known' + def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class) + when: 'an invalid event is produced' + this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null) + then: 'the event is not proceeded and no more network data record is created' + TimeUnit.SECONDS.sleep(3) + assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount) + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers) + } + +} diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy new file mode 100644 index 0000000..d3a407c --- /dev/null +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy @@ -0,0 +1,117 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka + +import org.mapstruct.factory.Mappers +import org.onap.cps.event.model.CpsDataUpdatedEvent +import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException +import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper +import org.onap.cps.temporal.service.NetworkDataService +import spock.lang.Specification + +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED + +/** + * Test specification for data updated event listener. + */ +class DataUpdatedEventListenerSpec extends Specification { + + // Define event data + def anEventType = 'my-event-type' + def anEventSource = new URI('my-event-source') + def aTimestamp = EventFixtures.currentIsoTimestamp() + def aDataspace = 'my-dataspace' + def aSchemaSet = 'my-schema-set' + def anAnchor = 'my-anchor' + def aDataName = 'my-data-name' + def aDataValue = 'my-data-value' + + // Define service mock + def mockService = Mock(NetworkDataService) + + // Define mapper + def mapper = Mappers.getMapper(CpsDataUpdatedEventMapper.class) + + // Define listener under test + def objectUnderTest = new DataUpdatedEventListener(mockService, mapper) + + def 'Event message consumption'() { + when: 'an event is received' + def event = + EventFixtures.buildEvent( + timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor, + dataName: aDataName, dataValue: aDataValue) + objectUnderTest.consume(event) + then: 'network data service is requested to persisted the data change' + 1 * mockService.addNetworkData( + { + it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp) + && it.getDataspace() == aDataspace + && it.getSchemaSet() == aSchemaSet + && it.getAnchor() == anAnchor + && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue) + && it.getCreatedTimestamp() == null + } + ) + } + + def 'Event message consumption fails because of missing envelop'() { + when: 'an event without envelop information is received' + def invalidEvent = new CpsDataUpdatedEvent().withSchema(null) + objectUnderTest.consume(invalidEvent) + then: 'an exception is thrown with 4 invalid fields' + def e = thrown(InvalidEventEnvelopException) + e.getInvalidFields().size() == 4 + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + MISSING,"schema", null, + CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT + .value())) + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + MISSING, "id", null, null)) + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString())) + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "type", null, EventFixtures.defaultEventType)) + e.getMessage().contains(e.getInvalidFields().toString()) + } + + def 'Event message consumption fails because of invalid envelop'() { + when: 'an event with an invalid envelop is received' + def invalidEvent = + new CpsDataUpdatedEvent() + .withId('my-id').withSource(anEventSource).withType(anEventType) + objectUnderTest.consume(invalidEvent) + then: 'an exception is thrown with 2 invalid fields' + def e = thrown(InvalidEventEnvelopException) + e.getInvalidFields().size() == 2 + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType)) + e.getInvalidFields().contains( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "source", anEventSource.toString(), + EventFixtures.defaultEventSource.toString())) + } + +} diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy new file mode 100644 index 0000000..44a28de --- /dev/null +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka + +import org.onap.cps.event.model.Content +import org.onap.cps.event.model.CpsDataUpdatedEvent +import org.onap.cps.event.model.Data + +import java.time.OffsetDateTime +import java.time.format.DateTimeFormatter + +/** + * This class contains utility fixtures methods for building and manipulating event data. + */ +class EventFixtures { + + static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + static String defaultEventType = 'org.onap.cps.data-updated-event' + static URI defaultEventSource = new URI('urn:cps:org.onap.cps') + + static CpsDataUpdatedEvent buildEvent(final Map map) { + CpsDataUpdatedEvent event = + new CpsDataUpdatedEvent() + .withId( + map.id != null ? map.id.toString() : UUID.randomUUID().toString()) + .withType( + map.eventType != null ? map.eventType.toString() : defaultEventType) + .withSource( + map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource) + .withContent( + new Content() + .withObservedTimestamp( + map.timestamp != null ? map.timestamp.toString() : currentTimestamp()) + .withDataspaceName( + map.dataspace != null ? map.dataspace.toString() : 'a-dataspace') + .withSchemaSetName( + map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set') + .withAnchorName( + map.anchor != null ? map.anchor.toString() : 'an-anchor') + .withData( + new Data().withAdditionalProperty( + map.dataName != null ? map.dataName.toString() : 'a-data-name', + map.dataValue != null ? map.dataValue : 'a-data-value'))) + + return event + } + + static String currentIsoTimestamp() { + return isoTimestampFormatter.format(OffsetDateTime.now()) + } + + static OffsetDateTime toOffsetDateTime(String timestamp) { + return OffsetDateTime.parse(timestamp, isoTimestampFormatter) + } + +} diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy new file mode 100644 index 0000000..132ff6d --- /dev/null +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy @@ -0,0 +1,131 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.model + +import com.fasterxml.jackson.core.JsonProcessingException +import org.mapstruct.factory.Mappers +import org.onap.cps.event.model.Content +import org.onap.cps.event.model.CpsDataUpdatedEvent +import org.onap.cps.event.model.Data +import org.onap.cps.temporal.domain.NetworkData +import spock.lang.Specification + +import java.time.OffsetDateTime +import java.time.format.DateTimeFormatter + +/** + * Test specification for data updated event mapper. + */ +class CpsDataUpdatedEventMapperSpec extends Specification { + + def objectUnderTest = Mappers.getMapper(CpsDataUpdatedEventMapper.class); + + def 'Mapping a null event'() { + given: 'a null event' + def event = null + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'the result entity is null' + result == null + } + + def 'Mapping an event whose properties are null'() { + given: 'an event whose properties are null' + def event = new CpsDataUpdatedEvent() + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'the result entity is not null' + result != null + and: 'all result entity properties are null' + assertEntityPropertiesAreNull(result) + } + + def 'Mapping an event whose content properties are null'() { + given: 'an event whose content properties are null' + def event = new CpsDataUpdatedEvent().withContent(new Content()) + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'the result entity is not null' + result != null + and: 'all result entity properties are null' + assertEntityPropertiesAreNull(result) + } + + def 'Mapping an event whose content data is empty'() { + given: 'an event whose content data is empty' + def event = new CpsDataUpdatedEvent().withContent(new Content().withData(new Data())) + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'the result entity is not null' + result != null + and: 'the result entity payload is an empty json ' + result.getPayload() == "{}" + } + + def 'Mapping an event whose content data is invalid'() { + given: 'an event whose content data is invalid' + def event = + new CpsDataUpdatedEvent().withContent(new Content().withData( + new Data().withAdditionalProperty(null, null))) + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'an runtime exception is thrown' + def e = thrown(RuntimeException) + e.getCause() instanceof JsonProcessingException + } + + def 'Mapping a valid complete event'() { + given: 'a valid complete event' + def isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + def aDataName = 'a-data-name' + def aDataValue = 'a-data-value' + def event = + new CpsDataUpdatedEvent() + .withContent( + new Content() + .withObservedTimestamp(isoTimestampFormatter.format(OffsetDateTime.now())) + .withDataspaceName('a-dataspace') + .withSchemaSetName('a-schema-set') + .withAnchorName('an-anchor') + .withData(new Data().withAdditionalProperty(aDataName, aDataValue))) + when: 'the event is mapped to an entity' + NetworkData result = objectUnderTest.eventToEntity(event) + then: 'the result entity is not null' + result != null + and: 'all result entity properties are the ones from the event' + result.getObservedTimestamp() == + OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter) + result.getDataspace() == event.getContent().getDataspaceName() + result.getSchemaSet() == event.getContent().getSchemaSetName() + result.getAnchor() == event.getContent().getAnchorName() + result.getPayload().contains(aDataValue) + result.getPayload().contains(aDataValue) + result.getCreatedTimestamp() == null + } + + private void assertEntityPropertiesAreNull(NetworkData networkData) { + assert networkData.getObservedTimestamp() == null + assert networkData.getDataspace() == null + assert networkData.getSchemaSet() == null + assert networkData.getAnchor() == null + assert networkData.getPayload() == null + assert networkData.getCreatedTimestamp() == null + } + +} diff --git a/src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy index f718bf4..c834f38 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy @@ -16,7 +16,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.temporal.controller +package org.onap.cps.temporal.controller.web import spock.lang.Specification @@ -34,4 +34,4 @@ class QueryControllerSpec extends Specification { ! response.empty } -}
\ No newline at end of file +} diff --git a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy index ec976ee..f66b35e 100644 --- a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy @@ -29,6 +29,9 @@ import spock.lang.Specification import java.time.OffsetDateTime +/** + * Test specification for network data repository. + */ @SpringBootTest @Testcontainers class NetworkDataRepositorySpec extends Specification { diff --git a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy index 70ac2bc..9847f54 100644 --- a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy @@ -18,28 +18,51 @@ package org.onap.cps.temporal.service +import org.onap.cps.temporal.domain.NetworkDataId + import java.time.OffsetDateTime import org.onap.cps.temporal.domain.NetworkData import org.onap.cps.temporal.repository.NetworkDataRepository import spock.lang.Specification +/** + * Test specification for network data service. + */ class NetworkDataServiceImplSpec extends Specification { - def objectUnderTest = new NetworkDataServiceImpl() - def mockNetworkDataRepository = Mock(NetworkDataRepository) + def objectUnderTest = new NetworkDataServiceImpl(mockNetworkDataRepository) + def networkData = new NetworkData() - def setup() { - objectUnderTest.networkDataRepository = mockNetworkDataRepository + def 'Add network data successfully.'() { + given: 'network data repository is persisting network data it is asked to save' + def persistedNetworkData = new NetworkData() + persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now()) + mockNetworkDataRepository.save(networkData) >> persistedNetworkData + when: 'a new network data is added' + def result = objectUnderTest.addNetworkData(networkData) + then: 'result network data is the one that has been persisted' + result == persistedNetworkData + result.getCreatedTimestamp() != null + networkData.getCreatedTimestamp() == null } - def 'Add network data in timeseries database.'() { + def 'Add network data fails because already added'() { + given: 'network data repository is not able to create data it is asked to persist ' + + 'and reveals it with null created timestamp on network data entity' + def persistedNetworkData = new NetworkData() + persistedNetworkData.setCreatedTimestamp(null) + mockNetworkDataRepository.save(networkData) >> persistedNetworkData + and: 'existing data can be retrieved' + def existing = new NetworkData() + existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1)) + mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing) when: 'a new network data is added' objectUnderTest.addNetworkData(networkData) - then: ' repository service is called with the correct parameters' - 1 * mockNetworkDataRepository.save(networkData) + then: 'network service exception is thrown' + thrown(ServiceException) } } diff --git a/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java b/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java index d47e8a5..a70e914 100644 --- a/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java +++ b/src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java @@ -70,4 +70,4 @@ public class LayeredArchitectureTest { .should().onlyHaveDependentClassesThat() .resideInAnyPackage(SERVICE_PACKAGE, REPOSITORY_PACKAGE); -}
\ No newline at end of file +} diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index afaff6c..3ac13a9 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -20,17 +20,37 @@ server: spring: datasource: url: ${DB_URL} - password: ${DB_PASSWORD} username: ${DB_USERNAME} + password: ${DB_PASSWORD} liquibase: change-log: classpath:/db/changelog/changelog-master.xml jpa: - open-in-view: false properties: - hibernate: - dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.format_sql: true + hibernate.generate_statistics: false + kafka: + bootstrap-servers: localhost:9092 + security: + protocol: PLAINTEXT + consumer: + group-id: cps-temporal-group + # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers + # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer + # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/ + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + auto-offset-reset: earliest + properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent + # Following is not cps-temporal configuration. It is configuration for the producer used for integration tests + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer -logging: - level: - org: - springframework: INFO +app: + listener: + data-updated: + topic: cps.cfg-state-events |