summaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/ncmp/dmi/notifications
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/groovy/org/onap/cps/ncmp/dmi/notifications')
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy8
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy21
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy3
3 files changed, 15 insertions, 17 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
index 96e2c16d..7ca2d54c 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
@@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def setup() {
cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
- consumer.subscribe([TEST_TOPIC] as List<String>)
+ kafkaConsumer.subscribe([TEST_TOPIC] as List<String>)
}
def cleanup() {
- consumer.close()
+ kafkaConsumer.close()
}
def 'Publish and Subscribe message - success'() {
when: 'a successful event is published'
objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
@@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception)
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
index 5f7ed878..a7557bb9 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
@@ -21,12 +21,10 @@
package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
-import org.onap.cps.ncmp.dmi.service.DmiService
-import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController
-import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
@@ -40,9 +38,9 @@ import java.time.Duration
class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
@SpringBean
- DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate)
+ DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate)
- def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
+ def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
def objectMapper = new ObjectMapper()
@@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
given: 'a simulated event'
dmiService.simulateEvents(1)
and: 'a consumer subscribed to dmi-cm-events topic'
- def consumer = new KafkaConsumer<>(consumerConfigProperties('test'))
- consumer.subscribe(['dmi-cm-events'])
+ cloudEventKafkaConsumer.subscribe(['dmi-cm-events'])
when: 'the next event record is consumed'
- def record = consumer.poll(Duration.ofMillis(1500)).iterator().next()
+ def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next()
then: 'record has correct topic'
assert record.topic == 'dmi-cm-events'
and: 'the record value can be mapped to an avcEvent'
- objectMapper.readValue(record.value(), AvcEvent)
+ def dmiDataAvcEvent = record.value()
+ def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+ assert convertedAvcEvent != null
}
} \ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
index 65567ef8..59873ecf 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse
@@ -39,8 +40,6 @@ import java.time.Duration
@DirtiesContext
class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
-
def objectMapper = new ObjectMapper()
def testTopic = 'dmi-ncmp-cm-avc-subscription'