summaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumerSpec.groovy
blob: 4795343933b709aaf06eee8dbbae4a4f9cfd0430 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2024 Nordix Foundation
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.ncmp.dmi.notifications.cmsubscription

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
import org.slf4j.LoggerFactory
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers

import java.sql.Timestamp
import java.time.Duration
import java.time.OffsetDateTime
import java.time.ZoneId


@SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
@Testcontainers
@DirtiesContext
class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
    def objectMapper = new ObjectMapper()
    def testTopic = 'dmi-ncmp-cm-avc-subscription'

    @SpringBean
    CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)

    def logger = Spy(ListAppender<ILoggingEvent>)

    void setup() {
        ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
        logger.start()
    }

    void cleanup() {
        ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
    }

    def 'Sends subscription cloud event response successfully.'() {
        given: 'an subscription event response'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
            def correlationId = 'test-subscriptionId#test-ncmp-dmi'
            def cmSubscriptionDmiOutEventData = new Data(statusCode: '1', statusMessage: 'ACCEPTED')
            def subscriptionEventResponse =
                    new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
        and: 'consumer has a subscription'
            kafkaConsumer.subscribe([testTopic] as List<String>)
        when: 'an event is published'
            def eventKey = UUID.randomUUID().toString()
            objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, CmNotificationSubscriptionStatus.ACCEPTED)
        and: 'topic is polled'
            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
        then: 'poll returns one record'
            assert records.size() == 1
            def record = records.iterator().next()
        and: 'the record value matches the expected event value'
            def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
            assert expectedValue == record.value
            assert eventKey == record.key
    }

    def 'Consume valid message.'() {
        given: 'an event'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            def eventKey = UUID.randomUUID().toString()
            def timestamp = new Timestamp(1679521929511)
            def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
            def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
                    .withType(subscriptionType)
                    .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
                    .withExtension("correlationid", eventKey)
                    .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
                    .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
        when: 'the valid event is consumed'
            objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
        then: 'no exception is thrown'
            noExceptionThrown()
        where: 'given #scenario'
            scenario                    | subscriptionType
            'Subscription Create Event' | "subscriptionCreated"
            'Subscription Delete Event' | "subscriptionDeleted"
    }

    def 'Consume invalid message.'() {
        given: 'an invalid event body'
            objectUnderTest.dmiName = 'test-ncmp-dmi'
            def eventKey = UUID.randomUUID().toString()
            def timestamp = new Timestamp(1679521929511)
            def invalidJsonBody = "/////"
            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
                    .withType("subscriptionCreated")
                    .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
                    .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
                    .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
        when: 'the invalid event is consumed'
            objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
        then: 'exception is thrown and event is logged'
            def loggingEvent = getLoggingEvent()
            assert loggingEvent.level == Level.ERROR
            assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
    }

    def getLoggingEvent() {
        return logger.list[0]
    }
}