summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/test/groovy
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/test/groovy')
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy10
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy62
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy29
6 files changed, 99 insertions, 29 deletions
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
index bcf75a29b2..fe7b3f11cb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -34,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
-
import java.time.Duration
@SpringBootTest(classes = [EventsPublisher, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
@SpringBean
EventsPublisher cpsAsyncRequestResponseEventPublisher =
- new EventsPublisher<NcmpAsyncRequestResponseEvent>(kafkaTemplate);
+ new EventsPublisher<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
@SpringBean
@@ -59,18 +59,18 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
@Autowired
JsonObjectMapper jsonObjectMapper
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
def 'Consume and forward valid message'() {
given: 'consumer has a subscription'
- kafkaConsumer.subscribe(['test-topic'] as List<String>)
+ legacyEventKafkaConsumer.subscribe(['test-topic'] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class)
when: 'the event is consumed'
ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent)
and: 'the topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'consumed forwarded event id is the same as sent event id'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
index 28464bb91c..02071cd8cf 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
@@ -46,7 +47,7 @@ import java.time.Duration
class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(kafkaTemplate)
+ EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
@@ -57,19 +58,19 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
@Autowired
RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
def static clientTopic = 'client-topic'
def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
- kafkaConsumer.subscribe([clientTopic])
+ legacyEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for batch event'
def consumerRecordIn = createConsumerRecord(batchEventType)
when: 'the batch event is consumed and published to client specified topic'
asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
- def consumerRecordOut = kafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
then: 'verifying consumed event operationID is same as published event operationID'
def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
new file mode 100644
index 0000000000..ed5f161258
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.kafka;
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaTemplateConfigSpec extends Specification {
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+ def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+ expect: 'kafka template is instantiated'
+ assert kafkaTemplateInstance.properties['beanName'] == beanName
+ and: 'verify event key and value serializer'
+ assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+ and: 'verify event key and value deserializer'
+ assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+ where: 'the following event type is used'
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 5f54bbe3dd..3dffac714b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -48,7 +49,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
@SpringBean
- EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(kafkaTemplate)
+ EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
@@ -56,13 +57,13 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
@Autowired
JsonObjectMapper jsonObjectMapper
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def 'Consume and forward valid message'() {
given: 'consumer has a subscription on a topic'
def cmEventsTopicName = 'cm-events'
acvEventConsumer.cmEventsTopicName = cmEventsTopicName
- kafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+ legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
@@ -73,7 +74,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
when: 'the event is consumed'
acvEventConsumer.consumeAndForward(consumerRecord)
and: 'the topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record can be converted to AVC event'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
index 93741261f6..4c6880421b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.lcm
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.lcm.v1.Event
@@ -42,12 +43,12 @@ import java.time.Duration
@DirtiesContext
class LcmEventsPublisherSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(kafkaTemplate)
+ EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
@@ -82,11 +83,11 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
eventSchema : eventSchema,
eventSchemaVersion: eventSchemaVersion]
and: 'consumer has a subscription'
- kafkaConsumer.subscribe([testTopic] as List<String>)
+ legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is published'
lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record key matches the expected event key'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
index 337178e128..603b8cdda6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
@@ -20,6 +20,8 @@
package org.onap.cps.ncmp.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.spockframework.spring.SpringBean
@@ -44,30 +46,33 @@ class MessagingBaseSpec extends Specification {
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ @SpringBean
+ KafkaTemplate legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(eventProducerConfigProperties(JsonSerializer)))
+
+ @SpringBean
+ KafkaTemplate cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+ dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+ }
+
+ def eventProducerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueSerializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
-
- @SpringBean
- KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
- @DynamicPropertySource
- static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
- dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
- }
}