summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/test/groovy
diff options
context:
space:
mode:
authorseanbeirne <sean.beirne@est.tech>2023-01-13 17:13:25 +0000
committerSeán Beirne <sean.beirne@est.tech>2023-01-23 10:09:41 +0000
commit7d25644c516869c299ea8d93ea915750e5b07203 (patch)
treec1ca85b53298326c041ce9914bac1ff7c12a7d8c /cps-ncmp-service/src/test/groovy
parent556e54f5905440e4c9c4fc7fbf9c027877e51517 (diff)
[NCMP] Consume & Forward to client topic
-Consumes event from dmi-cm-events -Immediately forwards to static topic (topic selection for events comes later from subscription information) -Added Kafka test -SHOULD BE MERGED BEFORE DMI PART Issue-ID: CPS-138 Signed-off-by: JosephKeenan <joseph.keenan@est.tech> Change-Id: I0a426381e2c3f9173b8d3916960c05722ad4f77d Signed-off-by: seanbeirne <sean.beirne@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/test/groovy')
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducerIntegrationSpec.groovy83
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy10
2 files changed, 88 insertions, 5 deletions
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducerIntegrationSpec.groovy
new file mode 100644
index 0000000000..0089f777d3
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducerIntegrationSpec.groovy
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.notifications.avc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.api.impl.async.NcmpAsyncRequestResponseEventMapper
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+import java.time.Duration
+
+@SpringBootTest(classes = [AvcEventProducer, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@Testcontainers
+@DirtiesContext
+class AvcEventProducerIntegrationSpec extends MessagingBaseSpec {
+
+ @SpringBean
+ AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
+
+ @SpringBean
+ AvcEventProducer avcEventProducer = new AvcEventProducer(kafkaTemplate, avcEventMapper)
+
+ @SpringBean
+ AvcEventConsumer acvEventConsumer = new AvcEventConsumer(avcEventProducer)
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+
+ def 'Consume and forward valid message'() {
+ given: 'consumer has a subscription'
+ kafkaConsumer.subscribe(['cm-events'] as List<String>)
+ and: 'an event is sent'
+ def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
+ when: 'the event is consumed'
+ acvEventConsumer.consumeAndForward(testEventSent)
+ and: 'the topic is polled'
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ then: 'poll returns one record'
+ assert records.size() == 1
+ and: 'record can be converted to AVC event'
+ def record = records.iterator().next()
+ def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent)
+ and: 'consumed forwarded NCMP event id differs from DMI event id'
+ assert testEventSent.eventId != convertedAvcEvent.getEventId()
+ and: 'correlation id matches'
+ assert testEventSent.eventCorrelationId == convertedAvcEvent.getEventCorrelationId()
+ and: 'timestamps match'
+ assert testEventSent.eventTime == convertedAvcEvent.getEventTime()
+ and: 'target matches'
+ assert testEventSent.eventTarget == convertedAvcEvent.getEventTarget()
+ }
+
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
index f7c41ecdf2..bb0ce8745d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2022 Nordix Foundation.
+ * Copyright (c) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,14 +33,14 @@ import spock.lang.Specification
class MessagingBaseSpec extends Specification {
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
- }
-
def setupSpec() {
kafkaTestContainer.start()
}
+ def cleanupSpec() {
+ kafkaTestContainer.stop()
+ }
+
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
def producerConfigProperties() {