diff options
author | Renu Kumari <renu.kumari@bell.ca> | 2021-12-08 11:16:58 -0500 |
---|---|---|
committer | Renu Kumari <renu.kumari@bell.ca> | 2021-12-15 00:02:06 +0000 |
commit | 60f41161ac9589dfaf0000587f216cae0e734142 (patch) | |
tree | 8f7e05f895c2c6de7b3edbbf767eea9d614109a6 /src | |
parent | 336dfaa28fe031d41f21f652d230eea34a4a21e5 (diff) |
Add support for delete operation
- Added new column operation in DB
- Updated existing rows to have 'UPDATE' value for operation field
- Added ability to process both V1 and V2 event schema
- Changed code and testcase to support operation field
Issue-ID: CPS-790
Signed-off-by: Renu Kumari <renu.kumari@bell.ca>
Change-Id: Ife24daa4b442e1499094b162727cc8704c25011e
Diffstat (limited to 'src')
14 files changed, 315 insertions, 141 deletions
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java index 5fce94e..2ae675e 100644 --- a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java index d180509..cb06d5f 100644 --- a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java +++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java @@ -25,10 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.time.OffsetDateTime; import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import org.onap.cps.event.model.Content; import org.onap.cps.event.model.CpsDataUpdatedEvent; import org.onap.cps.event.model.Data; import org.onap.cps.temporal.controller.utils.DateTimeUtility; import org.onap.cps.temporal.domain.NetworkData; +import org.onap.cps.temporal.domain.Operation; /** * Mapper for data updated event schema. @@ -43,6 +45,7 @@ public abstract class CpsDataUpdatedEventMapper { @Mapping(source = "content.schemaSetName", target = "schemaSet") @Mapping(source = "content.anchorName", target = "anchor") @Mapping(source = "content.data", target = "payload") + @Mapping(source = "content.operation", target = "operation") @Mapping(expression = "java(null)", target = "createdTimestamp") public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent); @@ -50,6 +53,10 @@ public abstract class CpsDataUpdatedEventMapper { return data != null ? objectMapper.writeValueAsString(data) : null; } + Operation map(final Content.Operation inputOperation) { + return inputOperation == null ? Operation.UPDATE : Operation.valueOf(inputOperation.toString()); + } + OffsetDateTime map(final String timestamp) { return DateTimeUtility.toOffsetDateTime(timestamp); } diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java index 1537e4a..e147871 100644 --- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java +++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -23,6 +25,8 @@ import java.io.Serializable; import java.time.OffsetDateTime; import javax.persistence.Column; import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; import javax.persistence.Id; import javax.persistence.IdClass; import javax.persistence.Table; @@ -70,6 +74,10 @@ public class NetworkData implements Serializable { private String schemaSet; @NotNull + @Column(updatable = false) + @Enumerated(EnumType.STRING) + private Operation operation; + @Type(type = "jsonb") @Column(columnDefinition = "jsonb", updatable = false) private String payload; diff --git a/src/main/java/org/onap/cps/temporal/domain/Operation.java b/src/main/java/org/onap/cps/temporal/domain/Operation.java new file mode 100644 index 0000000..06b5099 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/domain/Operation.java @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.domain; + +public enum Operation { + CREATE, + UPDATE, + DELETE +} diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java index 3eba6fb..8d282b4 100644 --- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java +++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java @@ -25,6 +25,7 @@ import javax.validation.ValidationException; import lombok.extern.slf4j.Slf4j; import org.onap.cps.temporal.domain.NetworkData; import org.onap.cps.temporal.domain.NetworkDataId; +import org.onap.cps.temporal.domain.Operation; import org.onap.cps.temporal.domain.SearchCriteria; import org.onap.cps.temporal.repository.NetworkDataRepository; import org.springframework.beans.factory.annotation.Value; @@ -42,26 +43,35 @@ public class NetworkDataServiceImpl implements NetworkDataService { private final int maxPageSize; public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository, - final @Value("${app.query.response.max-page-size}") int maxPageSize) { + final @Value("${app.query.response.max-page-size}") int maxPageSize) { this.networkDataRepository = networkDataRepository; this.maxPageSize = maxPageSize; } @Override public NetworkData addNetworkData(final NetworkData networkData) { + validateNetworkData(networkData); final var savedNetworkData = networkDataRepository.save(networkData); if (savedNetworkData.getCreatedTimestamp() == null) { // Data already exists and can not be inserted final var id = - new NetworkDataId( - networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor()); + new NetworkDataId( + networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor()); final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id); throw new ServiceException( - "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null))); + "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null))); } return savedNetworkData; } + private void validateNetworkData(final NetworkData networkData) { + if (networkData.getOperation() != Operation.DELETE + && networkData.getPayload() == null) { + throw new ValidationException( + String.format("The operation %s must not have null payload", networkData.getOperation())); + } + } + @Override public Slice<NetworkData> searchNetworkData(final SearchCriteria searchCriteria) { if (searchCriteria.getPageable().getPageSize() > maxPageSize) { diff --git a/src/main/resources/db/changelog/changelog-master.xml b/src/main/resources/db/changelog/changelog-master.xml index 6ec36fb..7986d5e 100644 --- a/src/main/resources/db/changelog/changelog-master.xml +++ b/src/main/resources/db/changelog/changelog-master.xml @@ -26,5 +26,5 @@ <include file="db/changelog/schema/01-init-schema.xml"/> <include file="db/changelog/data/02-init-data.xml"/> <include file="db/changelog/schema/03-rename-network-data-timestamp-fields.xml"/> - + <include file="db/changelog/schema/04-added-operation-field-in-network-data.xml"/> </databaseChangeLog> diff --git a/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml b/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml new file mode 100644 index 0000000..62b93b9 --- /dev/null +++ b/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + Copyright (c) 2021 Bell Canada. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + SPDX-License-Identifier: Apache-2.0 + ============LICENSE_END========================================================= +--> + +<databaseChangeLog + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog + http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd"> + + <changeSet id="4.1" author="cps"> + <comment>Add operation field in network data timescale table + and set default 'UPDATE' value for existing data + </comment> + <addColumn tableName="network_data"> + <column name="operation" type="VARCHAR(20)" + remarks="Field to store the operation type"> + </column> + </addColumn> + <update tableName="network_data"> + <column name="operation" value="UPDATE"/> + <where>operation is NULL</where> + </update> + <rollback> + <dropColumn tableName="network_data"> + <column name="operation"/> + </dropColumn> + </rollback> + </changeSet> + <changeSet id="4.2" author="cps"> + <comment>Remove not null constraint from payload to support delete operation</comment> + <dropNotNullConstraint tableName="network_data" columnName="payload"/> + </changeSet> +</databaseChangeLog> diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy index 2ba011f..01bb92d 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy @@ -6,19 +6,20 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= -*/ + */ package org.onap.cps.temporal.controller.event.listener.kafka -import groovy.util.logging.Slf4j import org.onap.cps.event.model.CpsDataUpdatedEvent import org.onap.cps.temporal.repository.containers.TimescaleContainer import org.springframework.beans.factory.annotation.Autowired @@ -42,16 +43,16 @@ import java.util.concurrent.TimeUnit */ @SpringBootTest @Testcontainers -@Slf4j class DataUpdatedEventListenerIntegrationSpec extends Specification { @Shared - TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance() + TimescaleContainer timescaleTestContainer = TimescaleContainer.getInstance() static kafkaTestContainer = new KafkaContainer() static { Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) } + def setupSpec() { kafkaTestContainer.start() } @@ -65,60 +66,49 @@ class DataUpdatedEventListenerIntegrationSpec extends Specification { @Value('${app.listener.data-updated.topic}') String topic - // Define event data - def aTimestamp = EventFixtures.currentIsoTimestamp() - def aDataspace = 'my-dataspace' - def aSchemaSet = 'my-schema-set' - def anAnchor = 'my-anchor' - - // Define sql queries for data validation - def sqlCount = "select count(*) from network_data" - def sqlSelect = "select * from network_data" - def sqlWhereClause = - ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' + - 'and dataspace = ? ' + - 'and schema_set = ? ' + - 'and anchor = ?' - def sqlCountWithConditions = sqlCount + sqlWhereClause - def sqlSelectWithConditions = sqlSelect + sqlWhereClause - def 'Processing a valid event'() { - given: "no event has been proceeded" - def initialRecordsCount = - jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class, - aTimestamp, aDataspace, aSchemaSet, anAnchor) - assert (initialRecordsCount == 0) + def aTimestamp = EventFixtures.currentIsoTimestamp() + given: 'no data exist for the anchor' + assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 0 when: 'an event is produced' def event = EventFixtures.buildEvent( - timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor) + observedTimestamp: aTimestamp, dataspace: 'my-dataspace', schemaSet: 'my-schema-set', + anchor: 'my-anchor', data: ['my-data-name': 'my-data-value']) this.kafkaTemplate.send(topic, event) - then: 'the event is proceeded' + then: 'the event is processed and data exists in database' def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2) pollingCondition.eventually { - def finalRecordsCount = - jdbcTemplate.queryForObject( - sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor) - assert (finalRecordsCount == 1) + assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 1 } - Map<String, Object> result = - jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor) - log.debug("Data retrieved from db: {}", result) } def 'Processing an invalid event'() { given: 'the number of network data records if known' - def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class) + def initialRecordsCount = networkDataAllRecordCount() when: 'an invalid event is produced' this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null) then: 'the event is not proceeded and no more network data record is created' TimeUnit.SECONDS.sleep(3) - assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount) + networkDataAllRecordCount() == initialRecordsCount + } + + def networkDataAllRecordCount() { + return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data', Integer.class) + } + + def networkDataConditionalCount(observedTimestamp, dataspaceName, schemaSetName, anchorName) { + return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data ' + + 'WHERE observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' + + 'AND dataspace = ? ' + + 'AND schema_set = ? ' + + 'AND anchor = ?', + Integer.class, observedTimestamp, dataspaceName, schemaSetName, anchorName) } @DynamicPropertySource static void registerKafkaProperties(DynamicPropertyRegistry registry) { - registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers) + registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) } } diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy index 055147f..e7e2570 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy @@ -6,22 +6,26 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.cps.temporal.controller.event.listener.kafka import org.mapstruct.factory.Mappers +import org.onap.cps.event.model.Content import org.onap.cps.event.model.CpsDataUpdatedEvent import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper +import org.onap.cps.temporal.domain.Operation import org.onap.cps.temporal.service.NetworkDataService import spock.lang.Specification @@ -36,15 +40,9 @@ class DataUpdatedEventListenerSpec extends Specification { public static final String EXPECTED_SCHEMA_EXCEPTION_MESSAGE = 'urn:cps:org.onap.cps:data-updated-event-schema:v99' // Define event data - def anEventType = 'my-event-type' def anEventSchema = new URI('my-event-schema') def anEventSource = new URI('my-event-source') def aTimestamp = EventFixtures.currentIsoTimestamp() - def aDataspace = 'my-dataspace' - def aSchemaSet = 'my-schema-set' - def anAnchor = 'my-anchor' - def aDataName = 'my-data-name' - def aDataValue = 'my-data-value' // Define service mock def mockService = Mock(NetworkDataService) @@ -56,21 +54,54 @@ class DataUpdatedEventListenerSpec extends Specification { def objectUnderTest = new DataUpdatedEventListener(mockService, mapper) def 'Event message consumption'() { - when: 'an event is received' + when: 'an event is received #scenario' + def defaultEventProperties = [observedTimestamp: aTimestamp, + dataspace : 'my-dataspace', + schemaSet : 'my-schema-set', + anchor : 'my-anchor', + data : ['my-data-name': 'my-data-value']] + def addOperationField = specifiedOperation != null ? [operation: Content.Operation.valueOf(specifiedOperation)] : [] def event = - EventFixtures.buildEvent( - timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor, - dataName: aDataName, dataValue: aDataValue) + EventFixtures.buildEvent(defaultEventProperties + addOperationField) objectUnderTest.consume(event) then: 'network data service is requested to persisted the data change' 1 * mockService.addNetworkData( { it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp) - && it.getDataspace() == aDataspace - && it.getSchemaSet() == aSchemaSet - && it.getAnchor() == anAnchor - && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue) - && it.getCreatedTimestamp() == null + && it.getDataspace() == 'my-dataspace' + && it.getSchemaSet() == 'my-schema-set' + && it.getAnchor() == 'my-anchor' + && it.getCreatedTimestamp() == null + && it.getOperation() == expectedOperation + && it.getPayload() == '{"my-data-name":"my-data-value"}' + + } + ) + where: + scenario | specifiedOperation || expectedOperation + 'without operation field' | null || Operation.UPDATE + 'create operation' | 'CREATE' || Operation.CREATE + } + + def 'Delete Event message consumption'() { + when: 'an delete event is received' + def deleteEvent = + EventFixtures.buildEvent([observedTimestamp: aTimestamp, + dataspace : 'my-dataspace', + schemaSet : 'my-schema-set', + anchor : 'my-anchor', + operation : Content.Operation.DELETE]) + objectUnderTest.consume(deleteEvent) + then: 'network data service is requested to persisted the data change' + 1 * mockService.addNetworkData( + { + it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp) + && it.getDataspace() == 'my-dataspace' + && it.getSchemaSet() == 'my-schema-set' + && it.getAnchor() == 'my-anchor' + && it.getCreatedTimestamp() == null + && it.getOperation() == Operation.DELETE + && it.getPayload() == null } ) } @@ -85,16 +116,16 @@ class DataUpdatedEventListenerSpec extends Specification { e.getInvalidFields().size() == 4 e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED,"schema", null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE)) + UNEXPECTED, 'schema', null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - MISSING, "id", null, null)) + MISSING, 'id', null, null)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString())) + UNEXPECTED, 'source', null, EventFixtures.defaultEventSource.toString())) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED, "type", null, EventFixtures.defaultEventType)) + UNEXPECTED, 'type', null, EventFixtures.defaultEventType)) e.getMessage().contains(e.getInvalidFields().toString()) } @@ -105,7 +136,7 @@ class DataUpdatedEventListenerSpec extends Specification { .withId('my-id') .withSchema(anEventSchema) .withSource(anEventSource) - .withType(anEventType) + .withType('my-event-type') objectUnderTest.consume(invalidEvent) then: 'an exception is thrown with 2 invalid fields' def e = thrown(InvalidEventEnvelopException) @@ -113,14 +144,14 @@ class DataUpdatedEventListenerSpec extends Specification { e.getInvalidFields().size() == 3 e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED, "schema", anEventSchema.toString(), + UNEXPECTED, 'schema', anEventSchema.toString(), EXPECTED_SCHEMA_EXCEPTION_MESSAGE)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType)) + UNEXPECTED, 'type', 'my-event-type', EventFixtures.defaultEventType)) e.getInvalidFields().contains( new InvalidEventEnvelopException.InvalidField( - UNEXPECTED, "source", anEventSource.toString(), + UNEXPECTED, 'source', anEventSource.toString(), EventFixtures.defaultEventSource.toString())) } diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy index 7c4dee6..95c47ce 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -32,36 +34,40 @@ class EventFixtures { static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") static String defaultEventType = 'org.onap.cps.data-updated-event' - static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') + static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v2') static URI defaultEventSource = new URI('urn:cps:org.onap.cps') - static CpsDataUpdatedEvent buildEvent(final Map map) { - CpsDataUpdatedEvent event = - new CpsDataUpdatedEvent() - .withSchema( - map.eventSchema != null ? new URI(map.eventSchema.toString()) : defaultEventSchema) - .withId( - map.id != null ? map.id.toString() : UUID.randomUUID().toString()) - .withType( - map.eventType != null ? map.eventType.toString() : defaultEventType) - .withSource( - map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource) - .withContent( - new Content() - .withObservedTimestamp( - map.timestamp != null ? map.timestamp.toString() : currentTimestamp()) - .withDataspaceName( - map.dataspace != null ? map.dataspace.toString() : 'a-dataspace') - .withSchemaSetName( - map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set') - .withAnchorName( - map.anchor != null ? map.anchor.toString() : 'an-anchor') - .withData( - new Data().withAdditionalProperty( - map.dataName != null ? map.dataName.toString() : 'a-data-name', - map.dataValue != null ? map.dataValue : 'a-data-value'))) + static def defaultEventValue = [ + eventSchema : defaultEventSchema, + id : UUID.randomUUID().toString(), + eventType : defaultEventType, + eventSource : defaultEventSource + ] + + static CpsDataUpdatedEvent buildEvent(final Map inputMap) { + def mergedMap = defaultEventValue + inputMap + + def dataExist = mergedMap.containsKey('data') + Data data = null + if (dataExist) { + data = new Data() + mergedMap.data.each { k, v -> data.withAdditionalProperty(k, v) } + } + + def content = new Content() + .withObservedTimestamp(mergedMap.observedTimestamp) + .withDataspaceName(mergedMap.dataspace) + .withSchemaSetName(mergedMap.schemaSet) + .withAnchorName(mergedMap.anchor) + .withOperation(mergedMap.operation) + .withData(data) - return event + return new CpsDataUpdatedEvent() + .withSchema(mergedMap.eventSchema) + .withId(mergedMap.id) + .withType(mergedMap.eventType) + .withSource(mergedMap.eventSource) + .withContent(content) } static String currentIsoTimestamp() { diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy index a51c4fe..0c82782 100644 --- a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -24,6 +26,7 @@ import org.onap.cps.event.model.Content import org.onap.cps.event.model.CpsDataUpdatedEvent import org.onap.cps.event.model.Data import org.onap.cps.temporal.domain.NetworkData +import org.onap.cps.temporal.domain.Operation import spock.lang.Specification import java.time.OffsetDateTime @@ -75,7 +78,7 @@ class CpsDataUpdatedEventMapperSpec extends Specification { then: 'the result entity is not null' result != null and: 'the result entity payload is an empty json ' - result.getPayload() == "{}" + result.getPayload() == '{}' } def 'Mapping an event whose content data is invalid'() { @@ -103,20 +106,33 @@ class CpsDataUpdatedEventMapperSpec extends Specification { .withDataspaceName('a-dataspace') .withSchemaSetName('a-schema-set') .withAnchorName('an-anchor') + .withOperation(Content.Operation.CREATE) .withData(new Data().withAdditionalProperty(aDataName, aDataValue))) when: 'the event is mapped to an entity' NetworkData result = objectUnderTest.eventToEntity(event) then: 'the result entity is not null' result != null and: 'all result entity properties are the ones from the event' - result.getObservedTimestamp() == - OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter) - result.getDataspace() == event.getContent().getDataspaceName() - result.getSchemaSet() == event.getContent().getSchemaSetName() - result.getAnchor() == event.getContent().getAnchorName() + with(result) { + observedTimestamp == + OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter) + dataspace == event.getContent().getDataspaceName() + schemaSet == event.getContent().getSchemaSetName() + operation == Operation.CREATE + anchor == event.getContent().getAnchorName() + createdTimestamp == null + } result.getPayload().contains(aDataValue) result.getPayload().contains(aDataValue) - result.getCreatedTimestamp() == null + } + + def 'Mapping event without operation field' () { + given: 'event without operation field in content' + def cpsDataUpdatedEvent = new CpsDataUpdatedEvent().withContent(new Content()) + when: 'event is mapped to network data' + def networkData = objectUnderTest.eventToEntity(cpsDataUpdatedEvent) + then: 'the operation field has default UPDATE value' + networkData.operation == Operation.UPDATE } private void assertEntityPropertiesAreNull(NetworkData networkData) { diff --git a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy index 2c7fc5e..d076f4d 100644 --- a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy @@ -21,6 +21,7 @@ package org.onap.cps.temporal.repository import org.onap.cps.temporal.domain.NetworkData +import org.onap.cps.temporal.domain.Operation import org.onap.cps.temporal.repository.containers.TimescaleContainer import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase @@ -51,8 +52,13 @@ class NetworkDataRepositorySpec extends Specification { @Autowired NetworkDataRepository networkDataRepository - def networkData = NetworkData.builder().observedTimestamp(observedTimestamp).dataspace(myDataspaceName) - .schemaSet(mySchemaSetName).anchor(myAnchorName).payload(payload).build() + def networkData = NetworkData.builder() + .observedTimestamp(observedTimestamp) + .dataspace(myDataspaceName) + .schemaSet(mySchemaSetName) + .anchor(myAnchorName) + .operation(Operation.CREATE) + .payload(payload).build() @Shared TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance() @@ -62,11 +68,14 @@ class NetworkDataRepositorySpec extends Specification { NetworkData savedData = networkDataRepository.save(networkData) TestTransaction.end() then: ' the saved Network Data is returned' - savedData.getDataspace() == networkData.getDataspace() - savedData.getSchemaSet() == networkData.getSchemaSet() - savedData.getAnchor() == networkData.getAnchor() - savedData.getPayload() == networkData.getPayload() - savedData.getObservedTimestamp() == networkData.getObservedTimestamp() + with(savedData) { + dataspace == networkData.getDataspace() + schemaSet == networkData.getSchemaSet() + anchor == networkData.getAnchor() + payload == networkData.getPayload() + observedTimestamp == networkData.getObservedTimestamp() + operation == networkData.operation + } and: ' createdTimestamp is auto populated by db ' networkData.getCreatedTimestamp() == null savedData.getCreatedTimestamp() != null diff --git a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy index 2e04ca8..952faf7 100644 --- a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy +++ b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy @@ -21,6 +21,7 @@ package org.onap.cps.temporal.service import org.onap.cps.temporal.domain.NetworkDataId +import org.onap.cps.temporal.domain.Operation import org.onap.cps.temporal.domain.SearchCriteria import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired @@ -51,10 +52,12 @@ class NetworkDataServiceImplSpec extends Specification { @Value('${app.query.response.max-page-size}') int maxPageSize - def networkData = new NetworkData() + def networkData = NetworkData.builder().operation(Operation.UPDATE).payload("{}").build() def 'Add network data successfully.'() { - given: 'network data repository is persisting network data it is asked to save' + given: 'a network data' + def networkData = NetworkData.builder().operation(operation).payload(payload).build() + and: 'network data repository is persisting network data' def persistedNetworkData = new NetworkData() persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now()) mockNetworkDataRepository.save(networkData) >> persistedNetworkData @@ -64,17 +67,36 @@ class NetworkDataServiceImplSpec extends Specification { result == persistedNetworkData result.getCreatedTimestamp() != null networkData.getCreatedTimestamp() == null + where: 'the following data is used' + operation | payload + Operation.CREATE | '{ "key" : "value" }' + Operation.UPDATE | '{ "key" : "value" }' + Operation.DELETE | null + } + + def 'Error Handling: Payload missing for #operation'() { + given: 'a network data' + def networkData = NetworkData.builder().operation(operation).build() + when: 'a new network data is added' + objectUnderTest.addNetworkData(networkData) + then: 'Validation exception is thrown' + def exception = thrown(ValidationException) + exception.getMessage().contains('null payload') + where: 'following operations are used' + operation << [ Operation.CREATE, Operation.UPDATE] } def 'Add network data fails because already added'() { given: 'network data repository is not able to create data it is asked to persist ' + - 'and reveals it with null created timestamp on network data entity' + 'and reveals it with null created timestamp on network data entity' def persistedNetworkData = new NetworkData() persistedNetworkData.setCreatedTimestamp(null) mockNetworkDataRepository.save(networkData) >> persistedNetworkData and: 'existing data can be retrieved' def existing = new NetworkData() + existing.setOperation(Operation.UPDATE) + existing.setPayload('{}') existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1)) mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing) when: 'a new network data is added' @@ -86,35 +108,30 @@ class NetworkDataServiceImplSpec extends Specification { def 'Query network data by search criteria.'() { given: 'search criteria' def searchCriteria = SearchCriteria.builder() - .dataspaceName('my-dataspaceName') - .schemaSetName('my-schemaset') - .pagination(0, 10) - .build() + .dataspaceName('my-dataspaceName') + .schemaSetName('my-schemaset') + .pagination(0, 10) + .build() and: 'response from repository' def pageFromRepository = new PageImpl<>(Collections.emptyList(), searchCriteria.getPageable(), 10) mockNetworkDataRepository.findBySearchCriteria(searchCriteria) >> pageFromRepository - when: 'search is executed' def resultPage = objectUnderTest.searchNetworkData(searchCriteria) - then: 'data is fetched from repository and returned' resultPage == pageFromRepository - } def 'Query network data with more than max page-size'() { given: 'search criteria with more than max page size' def searchCriteria = SearchCriteria.builder() - .dataspaceName('my-dataspaceName') - .schemaSetName('my-schemaset') - .pagination(0, maxPageSize + 1) - .build() + .dataspaceName('my-dataspaceName') + .schemaSetName('my-schemaset') + .pagination(0, maxPageSize + 1) + .build() when: 'search is executed' objectUnderTest.searchNetworkData(searchCriteria) - - then: 'throws error' + then: 'a validation exception is thrown' thrown(ValidationException) - } } diff --git a/src/test/resources/data/network-data-changes.sql b/src/test/resources/data/network-data-changes.sql index ce15f19..6ed52d6 100644 --- a/src/test/resources/data/network-data-changes.sql +++ b/src/test/resources/data/network-data-changes.sql @@ -5,25 +5,25 @@ COMMIT; -- Test pagination data -- Test created Before filter -- Test observed After Filter -INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP) +INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP) VALUES -('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'), -('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'), -('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-23 23:00:01.000'); +('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'), +('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'), +('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-23 23:00:01.000'); -- Test sorting on multiple fields -INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP) +INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP) VALUES -('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000'); +('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000'); -- Test simple payload filter on multiple field -INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP) +INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP) VALUES -('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'), -('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'), -('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'), -('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "03", "status" : "up" } ]}'::jsonb, '2021-07-24 04:00:01.000'); +('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'), +('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'), +('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'), +('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-24 04:00:01.000'); COMMIT; |