aboutsummaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
blob: 2ba011f89681bd8d34ca0bbeea504393dbc96cfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
 * ============LICENSE_START=======================================================
 * Copyright (c) 2021 Bell Canada.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
*/

package org.onap.cps.temporal.controller.event.listener.kafka

import groovy.util.logging.Slf4j
import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.onap.cps.temporal.repository.containers.TimescaleContainer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.spock.Testcontainers
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.util.concurrent.TimeUnit

/**
 * Integration test specification for data updated event listener.
 * This integration test is running database and kafka dependencies as docker containers.
 */
@SpringBootTest
@Testcontainers
@Slf4j
class DataUpdatedEventListenerIntegrationSpec extends Specification {

    @Shared
    TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()

    static kafkaTestContainer = new KafkaContainer()
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
    }
    def setupSpec() {
        kafkaTestContainer.start()
    }

    @Autowired
    KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate

    @Autowired
    JdbcTemplate jdbcTemplate

    @Value('${app.listener.data-updated.topic}')
    String topic

    // Define event data
    def aTimestamp = EventFixtures.currentIsoTimestamp()
    def aDataspace = 'my-dataspace'
    def aSchemaSet = 'my-schema-set'
    def anAnchor = 'my-anchor'

    // Define sql queries for data validation
    def sqlCount = "select count(*) from network_data"
    def sqlSelect =  "select * from network_data"
    def sqlWhereClause =
            ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
                    'and dataspace = ? ' +
                    'and schema_set = ? ' +
                    'and anchor = ?'
    def sqlCountWithConditions = sqlCount + sqlWhereClause
    def sqlSelectWithConditions = sqlSelect + sqlWhereClause

    def 'Processing a valid event'() {
        given: "no event has been proceeded"
            def initialRecordsCount =
                    jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
                            aTimestamp, aDataspace, aSchemaSet, anAnchor)
            assert (initialRecordsCount == 0)
        when: 'an event is produced'
            def event =
                    EventFixtures.buildEvent(
                            timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
            this.kafkaTemplate.send(topic, event)
        then: 'the event is proceeded'
            def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
            pollingCondition.eventually {
                def finalRecordsCount =
                        jdbcTemplate.queryForObject(
                                sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
                assert (finalRecordsCount == 1)
            }
            Map<String, Object> result =
                    jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
            log.debug("Data retrieved from db: {}", result)
    }

    def 'Processing an invalid event'() {
        given: 'the number of network data records if known'
            def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
        when: 'an invalid event is produced'
            this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
        then: 'the event is not proceeded and no more network data record is created'
            TimeUnit.SECONDS.sleep(3)
            assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
    }

    @DynamicPropertySource
    static void registerKafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
    }

}