summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy2
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy46
-rw-r--r--src/test/resources/application.yml2
3 files changed, 45 insertions, 5 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
index 4fc697ed..f5bc4ac4 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
@@ -35,5 +35,7 @@ class NcmpKafkaPublisherServiceSpec extends Specification {
objectUnderTest.publishToNcmp(messageKey, message)
then: 'no exception is thrown'
noExceptionThrown()
+ and: 'message is published once'
+ 1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message)
}
}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
index 54f3502c..00c8e6e7 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
@@ -20,19 +20,34 @@
package org.onap.cps.ncmp.dmi.service
-
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.spock.Testcontainers
import spock.lang.Specification
+import java.time.Duration
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
+
@SpringBootTest
@Testcontainers
+@DirtiesContext
class NcmpKafkaPublisherSpec extends Specification {
static kafkaTestContainer = new KafkaContainer()
@@ -50,16 +65,31 @@ class NcmpKafkaPublisherSpec extends Specification {
@Value('${app.ncmp.async-m2m.topic}')
String topic
- def 'Publish message'() {
+ KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig())
+
+ def 'Publish and Subscribe message'() {
given: 'a sample messsage and key'
def message = 'sample message'
def messageKey = 'message-key'
def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic)
when: 'a message is published'
objectUnderTest.sendMessage(messageKey, message)
- then: 'no exception is thrown'
- noExceptionThrown()
+ then: 'a message is consumed'
+ consumer.subscribe([topic] as List<String>)
+ def records = consumer.poll(Duration.ofMillis(1000))
+ assert records.size() == 1
+ assert messageKey == records[0].key
+ assert message == records[0].value
+ }
+ def kafkaConsumerConfig() {
+ def configs = [:]
+ configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name)
+ configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name)
+ configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest')
+ configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0])
+ configs.put(GROUP_ID_CONFIG, 'test')
+ return configs
}
@DynamicPropertySource
@@ -67,3 +97,11 @@ class NcmpKafkaPublisherSpec extends Specification {
registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
}
}
+
+@Configuration
+class TopicConfig {
+ @Bean
+ NewTopic newTopic() {
+ return new NewTopic("my-topic-name", 1, (short) 1);
+ }
+}
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index 344743b9..afaaa4d6 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -57,5 +57,5 @@ spring:
app:
ncmp:
async-m2m:
- topic: ncmp-async-m2m
+ topic: my-topic-name