summaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmSubscriptionDmiInEventConsumerSpec.groovy
blob: bf15f00c9986929e056d4c6fc205d75ed3f9de0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2023 Nordix Foundation
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.ncmp.dmi.notifications.cmsubscription

import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data
import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle
import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers

import java.sql.Timestamp
import java.time.Duration
import java.time.OffsetDateTime
import java.time.ZoneId

@SpringBootTest(classes = [CmSubscriptionDmiInEventConsumer])
@Testcontainers
@DirtiesContext
class CmSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {

    def objectMapper = new ObjectMapper()
    def testTopic = 'dmi-ncmp-cm-avc-subscription'

    @SpringBean
    CmSubscriptionDmiInEventConsumer objectUnderTest = new CmSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)

    def 'Sends subscription cloud event response successfully.'() {
        given: 'an subscription event response'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
            def responseStatus = SubscriptionStatus.Status.ACCEPTED
            def subscriptionStatuses = [new SubscriptionStatus(id: 'CmHandle1', status: responseStatus),
                                        new SubscriptionStatus(id: 'CmHandle2', status: responseStatus)]
            def cmSubscriptionDmiOutEventData = new Data(subscriptionName: 'cm-subscription-001',
                clientId: 'SCO-9989752', dmiName: 'ncmp-dmi-plugin', subscriptionStatus: subscriptionStatuses)
            def subscriptionEventResponse =
                new CmSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
        and: 'consumer has a subscription'
            kafkaConsumer.subscribe([testTopic] as List<String>)
        when: 'an event is published'
            def eventKey = UUID.randomUUID().toString()
            objectUnderTest.sendCmSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", subscriptionEventResponse)
        and: 'topic is polled'
            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
        then: 'poll returns one record'
            assert records.size() == 1
            def record = records.iterator().next()
        and: 'the record value matches the expected event value'
            def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
            assert expectedValue == record.value
            assert eventKey == record.key
    }

    def 'Consume valid message.'() {
        given: 'an event'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            def eventKey = UUID.randomUUID().toString()
            def timestamp = new Timestamp(1679521929511)
            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
            def subscriptionEvent = objectMapper.readValue(jsonData, CmSubscriptionDmiInEvent.class)
            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
                .withType(subscriptionType)
                .withDataSchema(URI.create("urn:cps:" + CmSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
                .withExtension("correlationid", eventKey)
                .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
                .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
        when: 'the valid event is consumed'
            objectUnderTest.consumeCmSubscriptionDmiInEvent(testEventSent)
        then: 'no exception is thrown'
            noExceptionThrown()
        where: 'given #senario'
            scenario                    | subscriptionType
            'Subscription Create Event' | "subscriptionCreated"
            'Subscription Delete Event' | "subscriptionDeleted"
    }

    def 'Consume invalid message.'() {
        given: 'an invalid event type'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            def eventKey = UUID.randomUUID().toString()
            def timestamp = new Timestamp(1679521929511)
            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
                .withType("subscriptionCreated")
                .withDataSchema(URI.create("urn:cps:" + CmSubscriptionDmiOutEvent.class.getName() + ":1.0.0"))
                .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
                .withExtension("correlationid", eventKey).build()
            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
        when: 'the invalid event is consumed'
            objectUnderTest.consumeCmSubscriptionDmiInEvent(testEventSent)
        then: 'no exception is thrown and event is logged'
            noExceptionThrown()
    }

    def 'Form a SubscriptionEventResponse from a SubscriptionEvent.'() {
        given: 'a SubscriptionEvent'
            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
            def subscriptionEvent = objectMapper.readValue(jsonData, CmSubscriptionDmiInEvent.class)
        when: 'a SubscriptionResponseEvent is formed'
            def result = objectUnderTest.formCmSubscriptionDmiOutEvent(subscriptionEvent)
        then: 'Confirm SubscriptionEventResponse was formed as expected'
            assert result.data.clientId == "SCO-9989752"
            assert result.data.subscriptionName == "cm-subscription-001"
    }

    def 'Extract cm handle ids from cm handle successfully.'() {
        given: 'a list of cm handles'
            def cmHandleIds =
                [new CmHandle(id: 'CmHandle1', additionalProperties: ['prop-x': 'prop-valuex']),
                 new CmHandle(id: 'CmHandle2', additionalProperties: ['prop-y': 'prop-valuey'])]
        when: 'extract the cm handle ids'
            def result = objectUnderTest.extractCmHandleIds(cmHandleIds)
        then: 'cm handle ids are extracted as expected'
            def expectedCmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
            assert expectedCmHandleIds == result
    }

    def 'Populate cm handle id to subscriptionStatus successfully.'() {
        given: 'a set of cm handle id'
            def cmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
            def responseStatus = SubscriptionStatus.Status.ACCEPTED
        when: 'populate cm handle id to subscriptionStatus'
            def result = objectUnderTest.populateSubscriptionStatus(cmHandleIds).status
        then: 'cm handle id to subscriptionStatus populated as expected'
            def expectedStatus = [responseStatus, responseStatus]
            expectedStatus == result
    }
}