From a287fc3286c38a6f472a9262777efc16798f705a Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Thu, 15 Jun 2023 03:12:29 +0100 Subject: Patch #2 : Introduce kafka template for cloud events - Introduced a new cloud kafka template for cloud events that reads it's configuration from application.yml - Kept legacy kafka template for backward compatibility utill all cps events moved to cloud event comply. - Modified application.yml producer and consumer value deserializer properties to support cloud events. - Added new cloudevents-bom used into cps-ncmp-service pom. - For the time being we will have 2 kafkatemplates (legacyEventKafkaTemplate, cloudEventKafkaTemplate) into EventsPublisher until we fully move to cloudevents for all events. Once all cps events will be cloud event compy, we have TODO task where Deprecated: legacyKafkaEventTemplate will be removed with its java configuration file KafkaTemplateConfig. Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh Change-Id: I78c15bd480db063b89c6630c46c2d3a328b4fae4 Signed-off-by: sourabh_sourabh --- cps-application/src/main/resources/application.yml | 4 +- cps-dependencies/pom.xml | 7 ++ cps-ncmp-service/pom.xml | 20 +++- .../api/impl/config/kafka/KafkaTemplateConfig.java | 127 +++++++++++++++++++++ .../cps/ncmp/api/impl/events/EventsPublisher.java | 13 ++- ...AsyncRequestResponseEventIntegrationSpec.groovy | 10 +- .../async/NcmpAsyncBatchEventConsumerSpec.groovy | 9 +- .../config/kafka/KafkaTemplateConfigSpec.groovy | 62 ++++++++++ .../impl/events/avc/AvcEventConsumerSpec.groovy | 9 +- .../impl/events/lcm/LcmEventsPublisherSpec.groovy | 9 +- .../cps/ncmp/api/kafka/MessagingBaseSpec.groovy | 29 +++-- .../src/test/resources/application.yml | 8 ++ 12 files changed, 269 insertions(+), 38 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 802da9e87c..ed71339f93 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -75,7 +75,7 @@ spring: security: protocol: PLAINTEXT producer: - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + value-serializer: io.cloudevents.kafka.CloudEventSerializer client-id: cps-core consumer: group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group} @@ -83,7 +83,7 @@ spring: value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer - spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer spring.json.use.type.headers: false jackson: diff --git a/cps-dependencies/pom.xml b/cps-dependencies/pom.xml index e06bbd7a1f..f792c9cfbc 100755 --- a/cps-dependencies/pom.xml +++ b/cps-dependencies/pom.xml @@ -146,6 +146,13 @@ cglib-nodep 3.1 + + io.cloudevents + cloudevents-bom + 2.5.0 + pom + import + org.apache.commons commons-lang3 diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml index b87fe64366..bbcb6873c3 100644 --- a/cps-ncmp-service/pom.xml +++ b/cps-ncmp-service/pom.xml @@ -41,6 +41,18 @@ org.apache.commons commons-lang3 + + io.cloudevents + cloudevents-json-jackson + + + io.cloudevents + cloudevents-kafka + + + io.cloudevents + cloudevents-spring + ${project.groupId} cps-service @@ -54,8 +66,8 @@ cps-path-parser - org.springframework - spring-web + com.hazelcast + hazelcast-spring org.mapstruct @@ -66,8 +78,8 @@ mapstruct-processor - com.hazelcast - hazelcast-spring + org.springframework + spring-web diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java new file mode 100644 index 0000000000..b76f86ebeb --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.kafka; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * kafka Configuration for legacy and cloud events. + * + * @param valid legacy event to be published over the wire. + */ +@Configuration +@EnableKafka +@RequiredArgsConstructor +public class KafkaTemplateConfig { + + private final KafkaProperties kafkaProperties; + + /** + * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into + * application.yml and replaces value-serializer by JsonSerializer. + * + * @return legacy event producer instance. + */ + @Bean + public ProducerFactory legacyEventProducerFactory() { + final Map producerConfigProperties = kafkaProperties.buildProducerProperties(); + producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined + * into application.yml and replaces deserializer-value by JsonDeserializer. + * + * @return an instance of legacy consumer factory. + */ + @Bean + public ConsumerFactory legacyEventConsumerFactory() { + final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into + * application.yml with CloudEventSerializer. + * + * @return cloud event producer instance. + */ + @Bean + public ProducerFactory cloudEventProducerFactory() { + final Map producerConfigProperties = kafkaProperties.buildProducerProperties(); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined + * into application.yml having CloudEventDeserializer as deserializer-value. + * + * @return an instance of cloud consumer factory. + */ + @Bean + public ConsumerFactory cloudEventConsumerFactory() { + final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. + * + * @return an instance of legacy Kafka template. + */ + @Bean + @Primary + public KafkaTemplate legacyEventKafkaTemplate() { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); + return kafkaTemplate; + } + + /** + * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this. + * + * @return an instance of cloud Kafka template. + */ + @Bean + public KafkaTemplate cloudEventKafkaTemplate() { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); + kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); + return kafkaTemplate; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index d92316dc58..7b28b4cd5f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -20,6 +20,7 @@ package org.onap.cps.ncmp.api.impl.events; +import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,7 +43,12 @@ import org.springframework.util.concurrent.ListenableFutureCallback; @RequiredArgsConstructor public class EventsPublisher { - private final KafkaTemplate eventKafkaTemplate; + /** Once all cps events will be modified to cloud compliant, will remove legacyKafkaEventTemplate with + it's java configuration file KafkaTemplateConfig. **/ + @Deprecated(forRemoval = true) + private final KafkaTemplate legacyKafkaEventTemplate; + + private final KafkaTemplate cloudEventKafkaTemplate; /** * Generic Event publisher. @@ -54,7 +60,8 @@ public class EventsPublisher { */ @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture> eventFuture + = legacyKafkaEventTemplate.send(topicName, eventKey, event); eventFuture.addCallback(handleCallback(topicName)); } @@ -70,7 +77,7 @@ public class EventsPublisher { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final ListenableFuture> eventFuture = eventKafkaTemplate.send(producerRecord); + final ListenableFuture> eventFuture = legacyKafkaEventTemplate.send(producerRecord); eventFuture.addCallback(handleCallback(topicName)); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index bcf75a29b2..fe7b3f11cb 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec @@ -34,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration @SpringBootTest(classes = [EventsPublisher, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) @@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @SpringBean EventsPublisher cpsAsyncRequestResponseEventPublisher = - new EventsPublisher(kafkaTemplate); + new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate); @SpringBean @@ -59,18 +59,18 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @Autowired JsonObjectMapper jsonObjectMapper - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test')) + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription' - kafkaConsumer.subscribe(['test-topic'] as List) + legacyEventKafkaConsumer.subscribe(['test-topic'] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class) when: 'the event is consumed' ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent) and: 'the topic is polled' - def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'consumed forwarded event id is the same as sent event id' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy index 28464bb91c..02071cd8cf 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy @@ -25,6 +25,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 @@ -46,7 +47,7 @@ import java.time.Duration class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher(kafkaTemplate) + EventsPublisher asyncBatchEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) @@ -57,19 +58,19 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { @Autowired RecordFilterStrategy recordFilterStrategy - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test')) + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) def static clientTopic = 'client-topic' def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' - kafkaConsumer.subscribe([clientTopic]) + legacyEventKafkaConsumer.subscribe([clientTopic]) and: 'consumer record for batch event' def consumerRecordIn = createConsumerRecord(batchEventType) when: 'the batch event is consumed and published to client specified topic' asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' - def consumerRecordOut = kafkaConsumer.poll(Duration.ofMillis(1500))[0] + def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] then: 'verifying consumed event operationID is same as published event operationID' def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy new file mode 100644 index 0000000000..ed5f161258 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.kafka; + +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer +import org.spockframework.spring.EnableSharedInjection +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.support.serializer.JsonSerializer +import spock.lang.Shared +import spock.lang.Specification + +@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig]) +@EnableSharedInjection +@EnableConfigurationProperties +class KafkaTemplateConfigSpec extends Specification { + + @Shared + @Autowired + KafkaTemplate legacyEventKafkaTemplate + + @Shared + @Autowired + KafkaTemplate cloudEventKafkaTemplate + + def 'Verify kafka template serializer and deserializer configuration for #eventType.'() { + expect: 'kafka template is instantiated' + assert kafkaTemplateInstance.properties['beanName'] == beanName + and: 'verify event key and value serializer' + assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName()) + and: 'verify event key and value deserializer' + assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName()) + where: 'the following event type is used' + eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer + 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer + 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 5f54bbe3dd..3dffac714b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.StringDeserializer import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec @@ -48,7 +49,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class) @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher(kafkaTemplate) + EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper) @@ -56,13 +57,13 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { @Autowired JsonObjectMapper jsonObjectMapper - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription on a topic' def cmEventsTopicName = 'cm-events' acvEventConsumer.cmEventsTopicName = cmEventsTopicName - kafkaConsumer.subscribe([cmEventsTopicName] as List) + legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) @@ -73,7 +74,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { when: 'the event is consumed' acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' - def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy index 93741261f6..4c6880421b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.lcm import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.events.lcm.v1.Event @@ -42,12 +43,12 @@ import java.time.Duration @DirtiesContext class LcmEventsPublisherSpec extends MessagingBaseSpec { - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) + def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) def testTopic = 'ncmp-events-test' @SpringBean - EventsPublisher lcmEventsPublisher = new EventsPublisher(kafkaTemplate) + EventsPublisher lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @Autowired JsonObjectMapper jsonObjectMapper @@ -82,11 +83,11 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec { eventSchema : eventSchema, eventSchemaVersion: eventSchemaVersion] and: 'consumer has a subscription' - kafkaConsumer.subscribe([testTopic] as List) + legacyEventKafkaConsumer.subscribe([testTopic] as List) when: 'an event is published' lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData) and: 'topic is polled' - def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record key matches the expected event key' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy index 337178e128..603b8cdda6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy @@ -20,6 +20,8 @@ package org.onap.cps.ncmp.api.kafka +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventSerializer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.spockframework.spring.SpringBean @@ -44,30 +46,33 @@ class MessagingBaseSpec extends Specification { static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka')) - def producerConfigProperties() { + @SpringBean + KafkaTemplate legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(eventProducerConfigProperties(JsonSerializer))) + + @SpringBean + KafkaTemplate cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + + def eventProducerConfigProperties(valueSerializer) { return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0], ('retries') : 0, ('batch-size') : 16384, ('linger.ms') : 1, ('buffer.memory') : 33554432, ('key.serializer') : StringSerializer, - ('value.serializer') : JsonSerializer] + ('value.serializer') : valueSerializer] } - def consumerConfigProperties(consumerGroupId) { + def eventConsumerConfigProperties(consumerGroupId, valueSerializer) { return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0], ('key.deserializer') : StringDeserializer, - ('value.deserializer'): StringDeserializer, + ('value.deserializer'): valueSerializer, ('auto.offset.reset') : 'earliest', ('group.id') : consumerGroupId ] } - - @SpringBean - KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(producerConfigProperties())) - - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { - dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) - } } diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml index 1016f2b033..197bfda19c 100644 --- a/cps-ncmp-service/src/test/resources/application.yml +++ b/cps-ncmp-service/src/test/resources/application.yml @@ -16,6 +16,14 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END========================================================= +spring: + kafka: + producer: + value-serializer: io.cloudevents.kafka.CloudEventSerializer + consumer: + properties: + spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer + app: ncmp: avc: -- cgit 1.2.3-korg