diff options
Diffstat (limited to 'src/test/groovy/org')
-rw-r--r-- | src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy | 2 | ||||
-rw-r--r-- | src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy | 46 |
2 files changed, 44 insertions, 4 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy index 4fc697ed..f5bc4ac4 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy @@ -35,5 +35,7 @@ class NcmpKafkaPublisherServiceSpec extends Specification { objectUnderTest.publishToNcmp(messageKey, message) then: 'no exception is thrown' noExceptionThrown() + and: 'message is published once' + 1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message) } } diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy index 54f3502c..00c8e6e7 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy @@ -20,19 +20,34 @@ package org.onap.cps.ncmp.dmi.service - +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.boot.test.context.SpringBootTest +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.DynamicPropertyRegistry import org.springframework.test.context.DynamicPropertySource import org.testcontainers.containers.KafkaContainer import org.testcontainers.spock.Testcontainers import spock.lang.Specification +import java.time.Duration + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + @SpringBootTest @Testcontainers +@DirtiesContext class NcmpKafkaPublisherSpec extends Specification { static kafkaTestContainer = new KafkaContainer() @@ -50,16 +65,31 @@ class NcmpKafkaPublisherSpec extends Specification { @Value('${app.ncmp.async-m2m.topic}') String topic - def 'Publish message'() { + KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig()) + + def 'Publish and Subscribe message'() { given: 'a sample messsage and key' def message = 'sample message' def messageKey = 'message-key' def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic) when: 'a message is published' objectUnderTest.sendMessage(messageKey, message) - then: 'no exception is thrown' - noExceptionThrown() + then: 'a message is consumed' + consumer.subscribe([topic] as List<String>) + def records = consumer.poll(Duration.ofMillis(1000)) + assert records.size() == 1 + assert messageKey == records[0].key + assert message == records[0].value + } + def kafkaConsumerConfig() { + def configs = [:] + configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name) + configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name) + configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest') + configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0]) + configs.put(GROUP_ID_CONFIG, 'test') + return configs } @DynamicPropertySource @@ -67,3 +97,11 @@ class NcmpKafkaPublisherSpec extends Specification { registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) } } + +@Configuration +class TopicConfig { + @Bean + NewTopic newTopic() { + return new NewTopic("my-topic-name", 1, (short) 1); + } +} |