aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorRenu Kumari <renu.kumari@bell.ca>2021-12-08 11:16:58 -0500
committerRenu Kumari <renu.kumari@bell.ca>2021-12-15 00:02:06 +0000
commit60f41161ac9589dfaf0000587f216cae0e734142 (patch)
tree8f7e05f895c2c6de7b3edbbf767eea9d614109a6 /src
parent336dfaa28fe031d41f21f652d230eea34a4a21e5 (diff)
Add support for delete operation
- Added new column operation in DB - Updated existing rows to have 'UPDATE' value for operation field - Added ability to process both V1 and V2 event schema - Changed code and testcase to support operation field Issue-ID: CPS-790 Signed-off-by: Renu Kumari <renu.kumari@bell.ca> Change-Id: Ife24daa4b442e1499094b162727cc8704c25011e
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java4
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java7
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/NetworkData.java10
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/Operation.java27
-rw-r--r--src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java18
-rw-r--r--src/main/resources/db/changelog/changelog-master.xml2
-rw-r--r--src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml51
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy68
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy79
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy62
-rw-r--r--src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy32
-rw-r--r--src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy23
-rw-r--r--src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy51
-rw-r--r--src/test/resources/data/network-data-changes.sql22
14 files changed, 315 insertions, 141 deletions
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
index 5fce94e..2ae675e 100644
--- a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
index d180509..cb06d5f 100644
--- a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
+++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
@@ -25,10 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
+import org.onap.cps.event.model.Content;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.onap.cps.event.model.Data;
import org.onap.cps.temporal.controller.utils.DateTimeUtility;
import org.onap.cps.temporal.domain.NetworkData;
+import org.onap.cps.temporal.domain.Operation;
/**
* Mapper for data updated event schema.
@@ -43,6 +45,7 @@ public abstract class CpsDataUpdatedEventMapper {
@Mapping(source = "content.schemaSetName", target = "schemaSet")
@Mapping(source = "content.anchorName", target = "anchor")
@Mapping(source = "content.data", target = "payload")
+ @Mapping(source = "content.operation", target = "operation")
@Mapping(expression = "java(null)", target = "createdTimestamp")
public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent);
@@ -50,6 +53,10 @@ public abstract class CpsDataUpdatedEventMapper {
return data != null ? objectMapper.writeValueAsString(data) : null;
}
+ Operation map(final Content.Operation inputOperation) {
+ return inputOperation == null ? Operation.UPDATE : Operation.valueOf(inputOperation.toString());
+ }
+
OffsetDateTime map(final String timestamp) {
return DateTimeUtility.toOffsetDateTime(timestamp);
}
diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
index 1537e4a..e147871 100644
--- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
+++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -23,6 +25,8 @@ import java.io.Serializable;
import java.time.OffsetDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;
@@ -70,6 +74,10 @@ public class NetworkData implements Serializable {
private String schemaSet;
@NotNull
+ @Column(updatable = false)
+ @Enumerated(EnumType.STRING)
+ private Operation operation;
+
@Type(type = "jsonb")
@Column(columnDefinition = "jsonb", updatable = false)
private String payload;
diff --git a/src/main/java/org/onap/cps/temporal/domain/Operation.java b/src/main/java/org/onap/cps/temporal/domain/Operation.java
new file mode 100644
index 0000000..06b5099
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/domain/Operation.java
@@ -0,0 +1,27 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.domain;
+
+public enum Operation {
+ CREATE,
+ UPDATE,
+ DELETE
+}
diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
index 3eba6fb..8d282b4 100644
--- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
+++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
@@ -25,6 +25,7 @@ import javax.validation.ValidationException;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.temporal.domain.NetworkData;
import org.onap.cps.temporal.domain.NetworkDataId;
+import org.onap.cps.temporal.domain.Operation;
import org.onap.cps.temporal.domain.SearchCriteria;
import org.onap.cps.temporal.repository.NetworkDataRepository;
import org.springframework.beans.factory.annotation.Value;
@@ -42,26 +43,35 @@ public class NetworkDataServiceImpl implements NetworkDataService {
private final int maxPageSize;
public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository,
- final @Value("${app.query.response.max-page-size}") int maxPageSize) {
+ final @Value("${app.query.response.max-page-size}") int maxPageSize) {
this.networkDataRepository = networkDataRepository;
this.maxPageSize = maxPageSize;
}
@Override
public NetworkData addNetworkData(final NetworkData networkData) {
+ validateNetworkData(networkData);
final var savedNetworkData = networkDataRepository.save(networkData);
if (savedNetworkData.getCreatedTimestamp() == null) {
// Data already exists and can not be inserted
final var id =
- new NetworkDataId(
- networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
+ new NetworkDataId(
+ networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id);
throw new ServiceException(
- "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
+ "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
}
return savedNetworkData;
}
+ private void validateNetworkData(final NetworkData networkData) {
+ if (networkData.getOperation() != Operation.DELETE
+ && networkData.getPayload() == null) {
+ throw new ValidationException(
+ String.format("The operation %s must not have null payload", networkData.getOperation()));
+ }
+ }
+
@Override
public Slice<NetworkData> searchNetworkData(final SearchCriteria searchCriteria) {
if (searchCriteria.getPageable().getPageSize() > maxPageSize) {
diff --git a/src/main/resources/db/changelog/changelog-master.xml b/src/main/resources/db/changelog/changelog-master.xml
index 6ec36fb..7986d5e 100644
--- a/src/main/resources/db/changelog/changelog-master.xml
+++ b/src/main/resources/db/changelog/changelog-master.xml
@@ -26,5 +26,5 @@
<include file="db/changelog/schema/01-init-schema.xml"/>
<include file="db/changelog/data/02-init-data.xml"/>
<include file="db/changelog/schema/03-rename-network-data-timestamp-fields.xml"/>
-
+ <include file="db/changelog/schema/04-added-operation-field-in-network-data.xml"/>
</databaseChangeLog>
diff --git a/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml b/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml
new file mode 100644
index 0000000..62b93b9
--- /dev/null
+++ b/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (c) 2021 Bell Canada.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ SPDX-License-Identifier: Apache-2.0
+ ============LICENSE_END=========================================================
+-->
+
+<databaseChangeLog
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
+ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
+
+ <changeSet id="4.1" author="cps">
+ <comment>Add operation field in network data timescale table
+ and set default 'UPDATE' value for existing data
+ </comment>
+ <addColumn tableName="network_data">
+ <column name="operation" type="VARCHAR(20)"
+ remarks="Field to store the operation type">
+ </column>
+ </addColumn>
+ <update tableName="network_data">
+ <column name="operation" value="UPDATE"/>
+ <where>operation is NULL</where>
+ </update>
+ <rollback>
+ <dropColumn tableName="network_data">
+ <column name="operation"/>
+ </dropColumn>
+ </rollback>
+ </changeSet>
+ <changeSet id="4.2" author="cps">
+ <comment>Remove not null constraint from payload to support delete operation</comment>
+ <dropNotNullConstraint tableName="network_data" columnName="payload"/>
+ </changeSet>
+</databaseChangeLog>
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
index 2ba011f..01bb92d 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
@@ -6,19 +6,20 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
-*/
+ */
package org.onap.cps.temporal.controller.event.listener.kafka
-import groovy.util.logging.Slf4j
import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.onap.cps.temporal.repository.containers.TimescaleContainer
import org.springframework.beans.factory.annotation.Autowired
@@ -42,16 +43,16 @@ import java.util.concurrent.TimeUnit
*/
@SpringBootTest
@Testcontainers
-@Slf4j
class DataUpdatedEventListenerIntegrationSpec extends Specification {
@Shared
- TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()
+ TimescaleContainer timescaleTestContainer = TimescaleContainer.getInstance()
static kafkaTestContainer = new KafkaContainer()
static {
Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
}
+
def setupSpec() {
kafkaTestContainer.start()
}
@@ -65,60 +66,49 @@ class DataUpdatedEventListenerIntegrationSpec extends Specification {
@Value('${app.listener.data-updated.topic}')
String topic
- // Define event data
- def aTimestamp = EventFixtures.currentIsoTimestamp()
- def aDataspace = 'my-dataspace'
- def aSchemaSet = 'my-schema-set'
- def anAnchor = 'my-anchor'
-
- // Define sql queries for data validation
- def sqlCount = "select count(*) from network_data"
- def sqlSelect = "select * from network_data"
- def sqlWhereClause =
- ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
- 'and dataspace = ? ' +
- 'and schema_set = ? ' +
- 'and anchor = ?'
- def sqlCountWithConditions = sqlCount + sqlWhereClause
- def sqlSelectWithConditions = sqlSelect + sqlWhereClause
-
def 'Processing a valid event'() {
- given: "no event has been proceeded"
- def initialRecordsCount =
- jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
- aTimestamp, aDataspace, aSchemaSet, anAnchor)
- assert (initialRecordsCount == 0)
+ def aTimestamp = EventFixtures.currentIsoTimestamp()
+ given: 'no data exist for the anchor'
+ assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 0
when: 'an event is produced'
def event =
EventFixtures.buildEvent(
- timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
+ observedTimestamp: aTimestamp, dataspace: 'my-dataspace', schemaSet: 'my-schema-set',
+ anchor: 'my-anchor', data: ['my-data-name': 'my-data-value'])
this.kafkaTemplate.send(topic, event)
- then: 'the event is proceeded'
+ then: 'the event is processed and data exists in database'
def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
pollingCondition.eventually {
- def finalRecordsCount =
- jdbcTemplate.queryForObject(
- sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
- assert (finalRecordsCount == 1)
+ assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 1
}
- Map<String, Object> result =
- jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
- log.debug("Data retrieved from db: {}", result)
}
def 'Processing an invalid event'() {
given: 'the number of network data records if known'
- def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
+ def initialRecordsCount = networkDataAllRecordCount()
when: 'an invalid event is produced'
this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
then: 'the event is not proceeded and no more network data record is created'
TimeUnit.SECONDS.sleep(3)
- assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
+ networkDataAllRecordCount() == initialRecordsCount
+ }
+
+ def networkDataAllRecordCount() {
+ return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data', Integer.class)
+ }
+
+ def networkDataConditionalCount(observedTimestamp, dataspaceName, schemaSetName, anchorName) {
+ return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data ' +
+ 'WHERE observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
+ 'AND dataspace = ? ' +
+ 'AND schema_set = ? ' +
+ 'AND anchor = ?',
+ Integer.class, observedTimestamp, dataspaceName, schemaSetName, anchorName)
}
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry registry) {
- registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
+ registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
}
}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
index 055147f..e7e2570 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
@@ -6,22 +6,26 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.cps.temporal.controller.event.listener.kafka
import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.Content
import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException
import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper
+import org.onap.cps.temporal.domain.Operation
import org.onap.cps.temporal.service.NetworkDataService
import spock.lang.Specification
@@ -36,15 +40,9 @@ class DataUpdatedEventListenerSpec extends Specification {
public static final String EXPECTED_SCHEMA_EXCEPTION_MESSAGE = 'urn:cps:org.onap.cps:data-updated-event-schema:v99'
// Define event data
- def anEventType = 'my-event-type'
def anEventSchema = new URI('my-event-schema')
def anEventSource = new URI('my-event-source')
def aTimestamp = EventFixtures.currentIsoTimestamp()
- def aDataspace = 'my-dataspace'
- def aSchemaSet = 'my-schema-set'
- def anAnchor = 'my-anchor'
- def aDataName = 'my-data-name'
- def aDataValue = 'my-data-value'
// Define service mock
def mockService = Mock(NetworkDataService)
@@ -56,21 +54,54 @@ class DataUpdatedEventListenerSpec extends Specification {
def objectUnderTest = new DataUpdatedEventListener(mockService, mapper)
def 'Event message consumption'() {
- when: 'an event is received'
+ when: 'an event is received #scenario'
+ def defaultEventProperties = [observedTimestamp: aTimestamp,
+ dataspace : 'my-dataspace',
+ schemaSet : 'my-schema-set',
+ anchor : 'my-anchor',
+ data : ['my-data-name': 'my-data-value']]
+ def addOperationField = specifiedOperation != null ? [operation: Content.Operation.valueOf(specifiedOperation)] : []
def event =
- EventFixtures.buildEvent(
- timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor,
- dataName: aDataName, dataValue: aDataValue)
+ EventFixtures.buildEvent(defaultEventProperties + addOperationField)
objectUnderTest.consume(event)
then: 'network data service is requested to persisted the data change'
1 * mockService.addNetworkData(
{
it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
- && it.getDataspace() == aDataspace
- && it.getSchemaSet() == aSchemaSet
- && it.getAnchor() == anAnchor
- && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue)
- && it.getCreatedTimestamp() == null
+ && it.getDataspace() == 'my-dataspace'
+ && it.getSchemaSet() == 'my-schema-set'
+ && it.getAnchor() == 'my-anchor'
+ && it.getCreatedTimestamp() == null
+ && it.getOperation() == expectedOperation
+ && it.getPayload() == '{"my-data-name":"my-data-value"}'
+
+ }
+ )
+ where:
+ scenario | specifiedOperation || expectedOperation
+ 'without operation field' | null || Operation.UPDATE
+ 'create operation' | 'CREATE' || Operation.CREATE
+ }
+
+ def 'Delete Event message consumption'() {
+ when: 'an delete event is received'
+ def deleteEvent =
+ EventFixtures.buildEvent([observedTimestamp: aTimestamp,
+ dataspace : 'my-dataspace',
+ schemaSet : 'my-schema-set',
+ anchor : 'my-anchor',
+ operation : Content.Operation.DELETE])
+ objectUnderTest.consume(deleteEvent)
+ then: 'network data service is requested to persisted the data change'
+ 1 * mockService.addNetworkData(
+ {
+ it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
+ && it.getDataspace() == 'my-dataspace'
+ && it.getSchemaSet() == 'my-schema-set'
+ && it.getAnchor() == 'my-anchor'
+ && it.getCreatedTimestamp() == null
+ && it.getOperation() == Operation.DELETE
+ && it.getPayload() == null
}
)
}
@@ -85,16 +116,16 @@ class DataUpdatedEventListenerSpec extends Specification {
e.getInvalidFields().size() == 4
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED,"schema", null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
+ UNEXPECTED, 'schema', null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- MISSING, "id", null, null))
+ MISSING, 'id', null, null))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString()))
+ UNEXPECTED, 'source', null, EventFixtures.defaultEventSource.toString()))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED, "type", null, EventFixtures.defaultEventType))
+ UNEXPECTED, 'type', null, EventFixtures.defaultEventType))
e.getMessage().contains(e.getInvalidFields().toString())
}
@@ -105,7 +136,7 @@ class DataUpdatedEventListenerSpec extends Specification {
.withId('my-id')
.withSchema(anEventSchema)
.withSource(anEventSource)
- .withType(anEventType)
+ .withType('my-event-type')
objectUnderTest.consume(invalidEvent)
then: 'an exception is thrown with 2 invalid fields'
def e = thrown(InvalidEventEnvelopException)
@@ -113,14 +144,14 @@ class DataUpdatedEventListenerSpec extends Specification {
e.getInvalidFields().size() == 3
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED, "schema", anEventSchema.toString(),
+ UNEXPECTED, 'schema', anEventSchema.toString(),
EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType))
+ UNEXPECTED, 'type', 'my-event-type', EventFixtures.defaultEventType))
e.getInvalidFields().contains(
new InvalidEventEnvelopException.InvalidField(
- UNEXPECTED, "source", anEventSource.toString(),
+ UNEXPECTED, 'source', anEventSource.toString(),
EventFixtures.defaultEventSource.toString()))
}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
index 7c4dee6..95c47ce 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -32,36 +34,40 @@ class EventFixtures {
static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
static String defaultEventType = 'org.onap.cps.data-updated-event'
- static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
+ static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v2')
static URI defaultEventSource = new URI('urn:cps:org.onap.cps')
- static CpsDataUpdatedEvent buildEvent(final Map map) {
- CpsDataUpdatedEvent event =
- new CpsDataUpdatedEvent()
- .withSchema(
- map.eventSchema != null ? new URI(map.eventSchema.toString()) : defaultEventSchema)
- .withId(
- map.id != null ? map.id.toString() : UUID.randomUUID().toString())
- .withType(
- map.eventType != null ? map.eventType.toString() : defaultEventType)
- .withSource(
- map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource)
- .withContent(
- new Content()
- .withObservedTimestamp(
- map.timestamp != null ? map.timestamp.toString() : currentTimestamp())
- .withDataspaceName(
- map.dataspace != null ? map.dataspace.toString() : 'a-dataspace')
- .withSchemaSetName(
- map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set')
- .withAnchorName(
- map.anchor != null ? map.anchor.toString() : 'an-anchor')
- .withData(
- new Data().withAdditionalProperty(
- map.dataName != null ? map.dataName.toString() : 'a-data-name',
- map.dataValue != null ? map.dataValue : 'a-data-value')))
+ static def defaultEventValue = [
+ eventSchema : defaultEventSchema,
+ id : UUID.randomUUID().toString(),
+ eventType : defaultEventType,
+ eventSource : defaultEventSource
+ ]
+
+ static CpsDataUpdatedEvent buildEvent(final Map inputMap) {
+ def mergedMap = defaultEventValue + inputMap
+
+ def dataExist = mergedMap.containsKey('data')
+ Data data = null
+ if (dataExist) {
+ data = new Data()
+ mergedMap.data.each { k, v -> data.withAdditionalProperty(k, v) }
+ }
+
+ def content = new Content()
+ .withObservedTimestamp(mergedMap.observedTimestamp)
+ .withDataspaceName(mergedMap.dataspace)
+ .withSchemaSetName(mergedMap.schemaSet)
+ .withAnchorName(mergedMap.anchor)
+ .withOperation(mergedMap.operation)
+ .withData(data)
- return event
+ return new CpsDataUpdatedEvent()
+ .withSchema(mergedMap.eventSchema)
+ .withId(mergedMap.id)
+ .withType(mergedMap.eventType)
+ .withSource(mergedMap.eventSource)
+ .withContent(content)
}
static String currentIsoTimestamp() {
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
index a51c4fe..0c82782 100644
--- a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -24,6 +26,7 @@ import org.onap.cps.event.model.Content
import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.onap.cps.event.model.Data
import org.onap.cps.temporal.domain.NetworkData
+import org.onap.cps.temporal.domain.Operation
import spock.lang.Specification
import java.time.OffsetDateTime
@@ -75,7 +78,7 @@ class CpsDataUpdatedEventMapperSpec extends Specification {
then: 'the result entity is not null'
result != null
and: 'the result entity payload is an empty json '
- result.getPayload() == "{}"
+ result.getPayload() == '{}'
}
def 'Mapping an event whose content data is invalid'() {
@@ -103,20 +106,33 @@ class CpsDataUpdatedEventMapperSpec extends Specification {
.withDataspaceName('a-dataspace')
.withSchemaSetName('a-schema-set')
.withAnchorName('an-anchor')
+ .withOperation(Content.Operation.CREATE)
.withData(new Data().withAdditionalProperty(aDataName, aDataValue)))
when: 'the event is mapped to an entity'
NetworkData result = objectUnderTest.eventToEntity(event)
then: 'the result entity is not null'
result != null
and: 'all result entity properties are the ones from the event'
- result.getObservedTimestamp() ==
- OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
- result.getDataspace() == event.getContent().getDataspaceName()
- result.getSchemaSet() == event.getContent().getSchemaSetName()
- result.getAnchor() == event.getContent().getAnchorName()
+ with(result) {
+ observedTimestamp ==
+ OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
+ dataspace == event.getContent().getDataspaceName()
+ schemaSet == event.getContent().getSchemaSetName()
+ operation == Operation.CREATE
+ anchor == event.getContent().getAnchorName()
+ createdTimestamp == null
+ }
result.getPayload().contains(aDataValue)
result.getPayload().contains(aDataValue)
- result.getCreatedTimestamp() == null
+ }
+
+ def 'Mapping event without operation field' () {
+ given: 'event without operation field in content'
+ def cpsDataUpdatedEvent = new CpsDataUpdatedEvent().withContent(new Content())
+ when: 'event is mapped to network data'
+ def networkData = objectUnderTest.eventToEntity(cpsDataUpdatedEvent)
+ then: 'the operation field has default UPDATE value'
+ networkData.operation == Operation.UPDATE
}
private void assertEntityPropertiesAreNull(NetworkData networkData) {
diff --git a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
index 2c7fc5e..d076f4d 100644
--- a/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
@@ -21,6 +21,7 @@
package org.onap.cps.temporal.repository
import org.onap.cps.temporal.domain.NetworkData
+import org.onap.cps.temporal.domain.Operation
import org.onap.cps.temporal.repository.containers.TimescaleContainer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase
@@ -51,8 +52,13 @@ class NetworkDataRepositorySpec extends Specification {
@Autowired
NetworkDataRepository networkDataRepository
- def networkData = NetworkData.builder().observedTimestamp(observedTimestamp).dataspace(myDataspaceName)
- .schemaSet(mySchemaSetName).anchor(myAnchorName).payload(payload).build()
+ def networkData = NetworkData.builder()
+ .observedTimestamp(observedTimestamp)
+ .dataspace(myDataspaceName)
+ .schemaSet(mySchemaSetName)
+ .anchor(myAnchorName)
+ .operation(Operation.CREATE)
+ .payload(payload).build()
@Shared
TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()
@@ -62,11 +68,14 @@ class NetworkDataRepositorySpec extends Specification {
NetworkData savedData = networkDataRepository.save(networkData)
TestTransaction.end()
then: ' the saved Network Data is returned'
- savedData.getDataspace() == networkData.getDataspace()
- savedData.getSchemaSet() == networkData.getSchemaSet()
- savedData.getAnchor() == networkData.getAnchor()
- savedData.getPayload() == networkData.getPayload()
- savedData.getObservedTimestamp() == networkData.getObservedTimestamp()
+ with(savedData) {
+ dataspace == networkData.getDataspace()
+ schemaSet == networkData.getSchemaSet()
+ anchor == networkData.getAnchor()
+ payload == networkData.getPayload()
+ observedTimestamp == networkData.getObservedTimestamp()
+ operation == networkData.operation
+ }
and: ' createdTimestamp is auto populated by db '
networkData.getCreatedTimestamp() == null
savedData.getCreatedTimestamp() != null
diff --git a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
index 2e04ca8..952faf7 100644
--- a/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
+++ b/src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
@@ -21,6 +21,7 @@
package org.onap.cps.temporal.service
import org.onap.cps.temporal.domain.NetworkDataId
+import org.onap.cps.temporal.domain.Operation
import org.onap.cps.temporal.domain.SearchCriteria
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
@@ -51,10 +52,12 @@ class NetworkDataServiceImplSpec extends Specification {
@Value('${app.query.response.max-page-size}')
int maxPageSize
- def networkData = new NetworkData()
+ def networkData = NetworkData.builder().operation(Operation.UPDATE).payload("{}").build()
def 'Add network data successfully.'() {
- given: 'network data repository is persisting network data it is asked to save'
+ given: 'a network data'
+ def networkData = NetworkData.builder().operation(operation).payload(payload).build()
+ and: 'network data repository is persisting network data'
def persistedNetworkData = new NetworkData()
persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now())
mockNetworkDataRepository.save(networkData) >> persistedNetworkData
@@ -64,17 +67,36 @@ class NetworkDataServiceImplSpec extends Specification {
result == persistedNetworkData
result.getCreatedTimestamp() != null
networkData.getCreatedTimestamp() == null
+ where: 'the following data is used'
+ operation | payload
+ Operation.CREATE | '{ "key" : "value" }'
+ Operation.UPDATE | '{ "key" : "value" }'
+ Operation.DELETE | null
+ }
+
+ def 'Error Handling: Payload missing for #operation'() {
+ given: 'a network data'
+ def networkData = NetworkData.builder().operation(operation).build()
+ when: 'a new network data is added'
+ objectUnderTest.addNetworkData(networkData)
+ then: 'Validation exception is thrown'
+ def exception = thrown(ValidationException)
+ exception.getMessage().contains('null payload')
+ where: 'following operations are used'
+ operation << [ Operation.CREATE, Operation.UPDATE]
}
def 'Add network data fails because already added'() {
given:
'network data repository is not able to create data it is asked to persist ' +
- 'and reveals it with null created timestamp on network data entity'
+ 'and reveals it with null created timestamp on network data entity'
def persistedNetworkData = new NetworkData()
persistedNetworkData.setCreatedTimestamp(null)
mockNetworkDataRepository.save(networkData) >> persistedNetworkData
and: 'existing data can be retrieved'
def existing = new NetworkData()
+ existing.setOperation(Operation.UPDATE)
+ existing.setPayload('{}')
existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1))
mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing)
when: 'a new network data is added'
@@ -86,35 +108,30 @@ class NetworkDataServiceImplSpec extends Specification {
def 'Query network data by search criteria.'() {
given: 'search criteria'
def searchCriteria = SearchCriteria.builder()
- .dataspaceName('my-dataspaceName')
- .schemaSetName('my-schemaset')
- .pagination(0, 10)
- .build()
+ .dataspaceName('my-dataspaceName')
+ .schemaSetName('my-schemaset')
+ .pagination(0, 10)
+ .build()
and: 'response from repository'
def pageFromRepository = new PageImpl<>(Collections.emptyList(), searchCriteria.getPageable(), 10)
mockNetworkDataRepository.findBySearchCriteria(searchCriteria) >> pageFromRepository
-
when: 'search is executed'
def resultPage = objectUnderTest.searchNetworkData(searchCriteria)
-
then: 'data is fetched from repository and returned'
resultPage == pageFromRepository
-
}
def 'Query network data with more than max page-size'() {
given: 'search criteria with more than max page size'
def searchCriteria = SearchCriteria.builder()
- .dataspaceName('my-dataspaceName')
- .schemaSetName('my-schemaset')
- .pagination(0, maxPageSize + 1)
- .build()
+ .dataspaceName('my-dataspaceName')
+ .schemaSetName('my-schemaset')
+ .pagination(0, maxPageSize + 1)
+ .build()
when: 'search is executed'
objectUnderTest.searchNetworkData(searchCriteria)
-
- then: 'throws error'
+ then: 'a validation exception is thrown'
thrown(ValidationException)
-
}
}
diff --git a/src/test/resources/data/network-data-changes.sql b/src/test/resources/data/network-data-changes.sql
index ce15f19..6ed52d6 100644
--- a/src/test/resources/data/network-data-changes.sql
+++ b/src/test/resources/data/network-data-changes.sql
@@ -5,25 +5,25 @@ COMMIT;
-- Test pagination data
-- Test created Before filter
-- Test observed After Filter
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
VALUES
-('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'),
-('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'),
-('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-23 23:00:01.000');
+('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'),
+('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'),
+('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-23 23:00:01.000');
-- Test sorting on multiple fields
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
VALUES
-('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000');
+('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000');
-- Test simple payload filter on multiple field
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
VALUES
-('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'),
-('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'),
-('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'),
-('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "03", "status" : "up" } ]}'::jsonb, '2021-07-24 04:00:01.000');
+('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'),
+('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'),
+('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'),
+('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-24 04:00:01.000');
COMMIT;